diff --git a/eval/final/components.py b/eval/final/components.py index e4d9036d..7a650edf 100644 --- a/eval/final/components.py +++ b/eval/final/components.py @@ -17,7 +17,6 @@ def run_final_eval_op( mmlu_branch_output: Output[Artifact], mt_bench_branch_output: Output[Artifact], - candidate_model: str, base_model_dir: str, tasks: Input[Dataset], taxonomy: Input[Dataset], @@ -29,6 +28,7 @@ def run_final_eval_op( few_shots: int, batch_size: int, merge_system_user_message: bool, + candidate_model: str = None, ): import json import os @@ -43,6 +43,11 @@ def run_final_eval_op( from instructlab.eval.mt_bench import MTBenchBranchEvaluator from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score + # For standalone mode + if candidate_model is None: + # logic to get the best model from the models folder and results + pass + ###################################################################### # branch_eval_summary_to_json creates a json object from output of instructlab/eval # TODO: Add this to the instructlab/eval or instructlab/instructlab repository @@ -221,7 +226,7 @@ def find_node_dataset_directories(base_directory: str): ###################################################################### # TODO: Update ilab/model/evaluate evaluate def logic to allow for external judge model - # and when that happens, much of this logic can be imported from the `evaluate` definition: + # and when that happens, much of this logic can be imported from the 'evaluate' definition: # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504 # # With instructlab, model_name is synonomous with model_path @@ -244,8 +249,8 @@ def find_node_dataset_directories(base_directory: str): ), ] - # ilab/evaluate uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # ilab/evaluate uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 if max_workers == "auto": try: diff --git a/eval/mt_bench/components.py b/eval/mt_bench/components.py index 429f4b2a..17beffdf 100644 --- a/eval/mt_bench/components.py +++ b/eval/mt_bench/components.py @@ -12,8 +12,8 @@ def run_mt_bench_op( models_path_prefix: str, mt_bench_output: Output[Artifact], merge_system_user_message: bool, - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 max_workers: str, models_list: List[str] = None, @@ -53,8 +53,8 @@ def run_mt_bench_op( scores = {} all_mt_bench_data = [] - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 if max_workers == "auto": try: diff --git a/pipeline.py b/pipeline.py index 9b5000de..9b2b35d7 100644 --- a/pipeline.py +++ b/pipeline.py @@ -348,7 +348,7 @@ def pipeline( final_eval_task.set_accelerator_type("nvidia.com/gpu") final_eval_task.set_accelerator_limit(1) - # Technically `output_model_task` and `output_data_task` can happen before evaluation, + # Technically 'output_model_task' and 'output_data_task' can happen before evaluation, # however the PVC can only be mounted once, so, setting these to _after_ so the eval proceeds. output_model_task = pvc_to_artifact_op( pvc_path="/output/data", @@ -417,7 +417,7 @@ def gen_standalone(): This function should be used when Kubeflow Pipelines are not available. It will generate a script that replicates the pipeline's functionality. - Example usage: ``` $ python pipeline.py gen-standalone ``` + Example usage: ''' $ python pipeline.py gen-standalone ''' """ from os import path @@ -442,11 +442,11 @@ def gen_standalone(): # The list of executor names to extract details from to generate the standalone script executors = { - "exec-data-processing-op": 'data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/input_data/generated", model="/input_model", processed_data="/input_data/processed_data")', - "exec-sdg-op": 'sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/input_data/taxonomy", sdg="/input_data/generated")', + "exec-data-processing-op": 'data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data")', + "exec-sdg-op": 'sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/data/taxonomy", sdg="/data/generated")', "exec-git-clone-op": {}, - "exec-huggingface-importer-op": 'huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/input_model")', - "exec-run-mt-bench-op": 'run_mt_bench_op(mt_bench_output="/output/mt-bench-results.txt", models_list="/output/model/model/hf_format", models_path_prefix="/output/model/hf_format", max_workers="auto", merge_system_user_message=False)', + "exec-huggingface-importer-op": 'huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/data/model")', + "exec-run-mt-bench-op": 'run_mt_bench_op(mt_bench_output="/data/mt-bench-results.txt", models_folder="/data/model/output/hf_format", models_path_prefix="/data/model/output/hf_format", max_workers="auto", merge_system_user_message=False)', } details = {} @@ -621,9 +621,18 @@ def change_dsl_function_to_normal_function(rendered_code: list): "import kfp": "", "from kfp import dsl": "", "from kfp.dsl import *": "", - ".path": "", # super hacky, but works for now, the idea is that "taxonomy.path" is a string so we just remove the ".path" part } + import re + + # Regular expression to match ".path" but not "os.path" + path_pattern = re.compile(r"(? None:\n \ \ # early validation logic here\n if train_args.max_batch_len\ \ < train_args.max_seq_len:\n raise ValueError(\n \ - \ f\"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=}\ + \ f\"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=}\ \ < {train_args.max_seq_len=}\"\n )\n\n # process\ \ the training data\n if not os.path.exists(train_args.data_output_dir):\n\ \ os.makedirs(train_args.data_output_dir, exist_ok=True)\n \ @@ -1008,16 +1009,18 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef run_final_eval_op(\n mmlu_branch_output: Output[Artifact],\n\ - \ mt_bench_branch_output: Output[Artifact],\n candidate_model: str,\n\ - \ base_model_dir: str,\n tasks: Input[Dataset],\n taxonomy: Input[Dataset],\n\ - \ base_branch: str,\n candidate_branch: str,\n max_workers: str,\n\ - \ device: str,\n model_dtype: str,\n few_shots: int,\n batch_size:\ - \ int,\n merge_system_user_message: bool,\n):\n import json\n import\ + \ mt_bench_branch_output: Output[Artifact],\n base_model_dir: str,\n\ + \ tasks: Input[Dataset],\n taxonomy: Input[Dataset],\n base_branch:\ + \ str,\n candidate_branch: str,\n max_workers: str,\n device: str,\n\ + \ model_dtype: str,\n few_shots: int,\n batch_size: int,\n merge_system_user_message:\ + \ bool,\n candidate_model: str = None,\n):\n import json\n import\ \ os\n\n import torch\n from helpers import (\n VLLM_SERVER,\n\ \ launch_vllm,\n stop_vllm,\n )\n from instructlab.eval.mmlu\ \ import MMLU_TASKS, MMLUBranchEvaluator\n from instructlab.eval.mt_bench\ \ import MTBenchBranchEvaluator\n from instructlab.model.evaluate import\ - \ qa_pairs_to_qna_to_avg_scores, sort_score\n\n ######################################################################\n\ + \ qa_pairs_to_qna_to_avg_scores, sort_score\n\n # For standalone mode\n\ + \ if candidate_model is None:\n # logic to get the best model\ + \ from the models folder and results\n pass\n\n ######################################################################\n\ \ # branch_eval_summary_to_json creates a json object from output of\ \ instructlab/eval\n # TODO: Add this to the instructlab/eval or instructlab/instructlab\ \ repository\n def branch_eval_summary_to_json(\n improvements:\ @@ -1107,7 +1110,7 @@ deploymentSpec: main\"\n\n ######################################################################\n\ \ # TODO: Update ilab/model/evaluate evaluate def logic to allow for\ \ external judge model\n # and when that happens, much of this logic\ - \ can be imported from the `evaluate` definition:\n # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504\n\ + \ can be imported from the 'evaluate' definition:\n # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504\n\ \ #\n # With instructlab, model_name is synonomous with model_path\n\ \ mt_bench_evaluators = [\n MTBenchBranchEvaluator(\n \ \ model_name=candidate_model,\n judge_model_name=judge_model_name,\n\ @@ -1118,7 +1121,7 @@ deploymentSpec: \ branch=base_branch,\n output_dir=output_dir,\n \ \ merge_system_user_message=merge_system_user_message,\n \ \ ),\n ]\n\n # ilab/evaluate uses a magic word for its mt_bench\ - \ evaluator - `auto`\n # with `auto`, number of gpus allocated for serving\ + \ evaluator - 'auto'\n # with 'auto', number of gpus allocated for serving\ \ is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ \ = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n\ @@ -1197,7 +1200,7 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef run_mt_bench_op(\n models_path_prefix: str,\n mt_bench_output:\ \ Output[Artifact],\n merge_system_user_message: bool,\n # generate_answers,judgment\ - \ uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`,\ + \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ \ number of gpus allocated for serving is calculated based on environment\n\ \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ max_workers: str,\n models_list: List[str] = None,\n models_folder:\ @@ -1215,7 +1218,7 @@ deploymentSpec: \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ )\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment\ - \ uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`,\ + \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ \ number of gpus allocated for serving is calculated based on environment\n\ \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ @@ -1286,7 +1289,7 @@ deploymentSpec: \ > 0) else \"empty\"\n\n print(\"Generating syntetic dataset for:\"\ )\n print()\n print(read_taxonomy(taxonomy.path, taxonomy_base))\n\ \n # generate_data has a magic word for its taxonomy_base argument -\ - \ `empty`\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n\ + \ 'empty'\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n\ \ generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n\ \ output_dir=sdg.path,\n taxonomy=taxonomy.path,\n \ \ taxonomy_base=taxonomy_base,\n model_name=model,\n chunk_word_count=1000,\n\ diff --git a/sdg/components.py b/sdg/components.py index 188aced1..d3607b6d 100644 --- a/sdg/components.py +++ b/sdg/components.py @@ -52,7 +52,7 @@ def sdg_op( print() print(read_taxonomy(taxonomy.path, taxonomy_base)) - # generate_data has a magic word for its taxonomy_base argument - `empty` + # generate_data has a magic word for its taxonomy_base argument - 'empty' # it allows generating from the whole repo, see: # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230 generate_data( diff --git a/standalone/README.md b/standalone/README.md index 9a5cbb0d..d98e1eae 100644 --- a/standalone/README.md +++ b/standalone/README.md @@ -98,6 +98,10 @@ The script requires information regarding the location and method for accessing * `--eval-serving-model-name`: The name of the model to use for evaluation. **Required** * `--eval-serving-model-api-key`: The API key for the model to evaluate. `EVAL_SERVING_MODEL_API_KEY` environment variable can be used as well. **Required** +* `--force-pull`: Force pull the data (sdg data and model) from the object store even if it already + exists in the PVC. **Optional** - Default: false. +* `--training-1-epoch-num`: The number of epochs to train the model for phase 1. **Optional** - Default: 7. +* `--training-2-epoch-num`: The number of epochs to train the model for phase 2. **Optional** - Default: 10. ## Example End-To-End Workflow diff --git a/standalone/standalone.py b/standalone/standalone.py index 1b96634e..788cd258 100755 --- a/standalone/standalone.py +++ b/standalone/standalone.py @@ -26,6 +26,7 @@ import logging import typing from urllib.parse import urlparse +from os import path import click import kubernetes @@ -46,20 +47,17 @@ K8S_NAME = "kfp-model-server" TOOLBOX_IMAGE = "registry.access.redhat.com/ubi9/toolbox" PYTHON_IMAGE = "registry.access.redhat.com/ubi9/python-311:latest" -SDG_PVC_NAME = "sdg-data" -SDG_PVC_MOUNT_PATH = "/input_data" -SDG_VOLUME_NAME = "input-data" -MODEL_PVC_NAME = "model" -MODEL_PVC_MOUNT_PATH = "/input_model" -MODEL_VOLUME_NAME = "model" -TAXONOMY_PATH = SDG_PVC_MOUNT_PATH + "/taxonomy" -TRAINING_PVC_NAME = "training-data" -TRAINING_PVC_MOUNT_PATH = "/output" -TRAINING_VOLUME_NAME = "output" +DATA_PVC_NAME = "data" +DATA_PVC_MOUNT_PATH = "/data" +DATA_PVC_MODEL_PATH = path.join(DATA_PVC_MOUNT_PATH, "model") +DATA_VOLUME_NAME = "data" +TAXONOMY_PATH = path.join(DATA_PVC_MOUNT_PATH, "taxonomy") +DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") +DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PYTORCH_NNODES = 2 -PYTORCH_IMAGE = "quay.io/shanand/test-train:0.0.4" +PYTORCH_IMAGE = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989" # MMLU_SCORES_PATH = "/output/mmlu-results.txt" -MT_BENCH_SCORES_PATH = "/output/mt-bench-results.txt" +MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") SDG_OBJECT_STORE_SECRET_NAME = "sdg-object-store-credentials" KFP_MODEL_SERVER_CM = """ # TODO: remove the following line and replace it with the actual ConfigMap/Secret @@ -86,7 +84,7 @@ kind: ConfigMap apiVersion: v1 metadata: - name: {EVAL_SERVING_NAME} + name: {eval_serving_name} data: endpoint: {eval_serving_endpoint} model: {eval_serving_model_name} @@ -94,7 +92,7 @@ apiVersion: v1 kind: Secret metadata: - name: {EVAL_SERVING_NAME} + name: {eval_serving_name} type: Opaque stringData: api_key: {eval_serving_model_api_key} @@ -119,9 +117,38 @@ containers: - args: - | - mkdir -p /output/model; - mkdir -p /output/data; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /output/model --data_output_dir /input_data/processed_data + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" + mkdir -p /data/model; + mkdir -p /data/data; + mkdir -p {path_to_model}/output + export XDG_CACHE_HOME=/tmp + export TRITON_CACHE_DIR=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir={path_to_model}/output \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=1e-4 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' @@ -129,17 +156,11 @@ image: {PYTORCH_IMAGE} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -150,15 +171,9 @@ cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data + - name: data persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} Worker: replicas: {worker_replicas} restartPolicy: OnFailure @@ -170,8 +185,36 @@ containers: - args: - | + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" mkdir -p /tmp/model; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /tmp/model --data_output_dir /input_data/processed_data + export TRITON_CACHE_DIR=/tmp + export XDG_CACHE_HOME=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir=/tmp/model \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=2e-6 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' @@ -179,18 +222,11 @@ image: {PYTORCH_IMAGE} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -201,20 +237,26 @@ cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model + - name: data persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} """ # TODO: support signature version? -SDG_DATA_SCRIPT = """ +DATA_SCRIPT = """ set -e +FORCE_PULL={force_pull} +if [ -s {data_pvc_mount_path}/data.tar.gz ] && [ -d {data_pvc_mount_path}/data ] && [ -d {data_pvc_mount_path}/model ] ; then + echo "Data tarball and sdg/model directories already exist in the PVC. Skipping download." + if [ "$FORCE_PULL" == "None" ] || [ "$FORCE_PULL" == "False" ]; then + echo "'--force-pull' is not set - will not force pull the data from the object store" + ls -laR {data_pvc_mount_path} + exit 0 + else + echo "'--force-pull' is set to true - will force pull the data from the object store" + fi +fi + export STRATEGY={strategy} if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then @@ -257,7 +299,7 @@ def download_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') - output_file = '{MODEL_PVC_MOUNT_PATH}/data.tar.gz' + output_file = '{data_pvc_mount_path}/data.tar.gz' s3.download_file(bucket_name, s3_key, output_file) @@ -266,7 +308,7 @@ def upload_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name - input_file = '{MODEL_PVC_MOUNT_PATH}/data.tar.gz' # TODO: change for model path + input_file = '{data_pvc_mount_path}/data.tar.gz' # TODO: change for model path s3.upload_file(input_file, bucket_name, s3_key) @@ -285,13 +327,10 @@ def upload_s3_file(): if [ "$STRATEGY" == "download" ]; then # List top-level directories only (no nested directories) - top_level_dirs=$(tar --exclude='*/*' --list --file {MODEL_PVC_MOUNT_PATH}/data.tar.gz) - - # List of directories we expect in the archive - expected_dirs=("data" "model") + top_level_dirs=$(tar --exclude='*/*' --list --file {data_pvc_mount_path}/data.tar.gz) # Loop through the expected directories and check if they exist in the archive - for dir in "${expected_dirs[@]}"; do + for dir in data model; do if ! echo "$top_level_dirs" | grep -q "^$dir/$"; then echo "Archive does not contain a '$dir' directory" exit 1 @@ -299,13 +338,9 @@ def upload_s3_file(): done echo "All expected directories are present." - # First extract SDG data in the SDG PVC - mkdir -p {SDG_PVC_MOUNT_PATH}/generated - tar -C {SDG_PVC_MOUNT_PATH}/generated -xf data.tar.gz --strip-components=1 data/ - - # Then extract the model in the model PVC - mkdir -p {MODEL_PVC_MOUNT_PATH}/model - tar -C {MODEL_PVC_MOUNT_PATH} -xf {MODEL_PVC_MOUNT_PATH}/data.tar.gz --strip-components=1 model/ + echo "Extracting data from the archive" + tar -C {data_pvc_mount_path} -xvf {data_pvc_mount_path}/data.tar.gz + ls -laR {data_pvc_mount_path} fi """ @@ -344,6 +379,7 @@ def upload_s3_file(): PYTHON_EXECUTOR = """ set -e +export XDG_CACHE_HOME=/tmp tmp=$(mktemp -d) cat < "$tmp"/exec.py @@ -462,9 +498,7 @@ def show( @cli.group(invoke_without_command=True) -@click.option( - "--namespace", type=str, default="default", help="Kubernetes namespace to use" -) +@click.option("--namespace", type=str, help="Kubernetes namespace to use") @click.option( "--taxonomy-repo-url", type=str, @@ -544,7 +578,11 @@ def show( ) @click.option( "--model-to-train", - help="Path to model to train (PVC filesystem path)", + help=( + "Path to model to train (PVC filesystem path). " + "Useful when calling training phases independently and users wants to point to the epoch directory. " + "Very advanced usage, not recommended for general use." + ), type=str, ) @click.option( @@ -614,10 +652,24 @@ def show( ), type=str, ) +@click.option( + "--force-pull", + help="Force pull the data (sdg data and model) from the object store even if it already exists in the PVC.", + is_flag=True, + default=False, +) +@click.option( + "--training-1-epoch-num", help="Number of epochs to train the model for.", default=7 +) +@click.option( + "--training-2-epoch-num", + help="Number of epochs to train the model for.", + default=10, +) @click.pass_context def run( ctx: click.Context, - namespace: typing.Optional[str] = "default", + namespace: typing.Optional[str] = None, taxonomy_repo_url: str = "", taxonomy_repo_branch: typing.Optional[str] = "", taxonomy_repo_pr: typing.Optional[str] = "", @@ -639,6 +691,9 @@ def run( sdg_object_store_data_key: typing.Optional[str] = None, sdg_object_store_verify_tls: typing.Optional[bool] = None, sdg_object_store_secret: typing.Optional[str] = None, + force_pull: typing.Optional[bool] = False, + training_1_epoch_num: int = 7, + training_2_epoch_num: int = 10, ): """ Execute the distributed training on Kubernetes. @@ -666,6 +721,9 @@ def run( sdg_object_store_data_key (str): The name of the tarball that contains SDG data. sdg_object_store_verify_tls (bool): Verify TLS for the object store. sdg_object_store_secret (str): The name of the Kubernetes Secret containing the SDG object store credentials. The namespace is inferred from the namespace option. + force_pull (bool): Force pull the data (sdg data and model) from the object store even if it already exists in the PVC. + training_1_epoch_num (int): Number of epochs to train the model for during phase 1. + training_2_epoch_num (int): Number of epochs to train the model for during phase 2. Returns: None @@ -693,6 +751,9 @@ def run( ctx.obj["sdg_object_store_data_key"] = sdg_object_store_data_key ctx.obj["sdg_object_store_verify_tls"] = sdg_object_store_verify_tls ctx.obj["sdg_object_store_secret"] = sdg_object_store_secret + ctx.obj["force_pull"] = force_pull + ctx.obj["training_1_epoch_num"] = training_1_epoch_num + ctx.obj["training_2_epoch_num"] = training_2_epoch_num ########################## # MAIN WORKFLOW SEQUENCE # @@ -722,11 +783,19 @@ def run( # ctx.obj["model_to_train"] = best_model.get("model") # Training Phase 2 - # ctx.invoke(train) + ctx.obj["training_phase"] = "2" + ctx.invoke(train) # Evaluation of phase 2 with MT-Bench - # ctx.obj["eval_type"] = "mt-bench" - # _ = ctx.invoke(evaluation) + ctx.obj["eval_type"] = "mt-bench" + scores = ctx.invoke(evaluation) + scores = json.loads(scores) + best_model = max(scores, key=lambda x: x["average_score"]) + logger.info("Best model: %s", best_model.get("model")) + ctx.obj["candidate_model"] = best_model.get("model") + + # Final evaluation + # TODO def get_security_context() -> kubernetes.client.V1SecurityContext: @@ -739,33 +808,27 @@ def get_security_context() -> kubernetes.client.V1SecurityContext: ) -def get_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol_mount() -> list[kubernetes.client.V1VolumeMount]: """ Get the volume mount for the SDG job. """ return [ kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + name=DATA_VOLUME_NAME, mount_path=DATA_PVC_MOUNT_PATH ), ] -def get_fetch_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol() -> list[kubernetes.client.V1Volume]: """ - Get the volume mount for the SDG job. + Get the volume for the SDG job. """ return [ - kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH + kubernetes.client.V1Volume( + name=DATA_VOLUME_NAME, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=DATA_PVC_NAME + ), ), ] @@ -827,7 +890,7 @@ def sdg_op( print() print(read_taxonomy(taxonomy, taxonomy_base)) - # generate_data has a magic word for its taxonomy_base argument - `empty` + # generate_data has a magic word for its taxonomy_base argument - 'empty' # it allows generating from the whole repo, see: # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230 generate_data( @@ -842,7 +905,7 @@ def sdg_op( ) """ exec_sdg_op_args = """ -sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/input_data/taxonomy", sdg="/input_data/generated") +sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/data/taxonomy", sdg="/data/generated") """ exec_huggingface_importer_op_command = """ @@ -854,7 +917,7 @@ def huggingface_importer_op(model: str, repo_name: str): snapshot_download(repo_id=repo_name, cache_dir="/tmp", local_dir=model) """ exec_huggingface_importer_op_args = """ -huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/input_model") +huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/data/model") """ exec_data_processing_op_command = """ @@ -900,11 +963,11 @@ def data_processing(train_args: TrainingArgs) -> None: # early validation logic here if train_args.max_batch_len < train_args.max_seq_len: raise ValueError( - f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" ) # process the training data - if not os.exists(train_args.data_output_dir): + if not os.path.exists(train_args.data_output_dir): os.makedirs(train_args.data_output_dir, exist_ok=True) dp.main( DataProcessArgs( @@ -925,7 +988,7 @@ def data_processing(train_args: TrainingArgs) -> None: data_processing(train_args=training_args) """ exec_data_processing_op_args = """ -data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/input_data/generated", model="/input_model", processed_data="/input_data/processed_data") +data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data") """ init_containers = [ @@ -936,7 +999,7 @@ def data_processing(train_args: TrainingArgs) -> None: args=[ 'git clone {exec_git_clone_op_repo_url} {TAXONOMY_PATH} && cd {TAXONOMY_PATH} && if [ -n "{exec_git_clone_op_repo_branch}" ]; then git fetch origin {exec_git_clone_op_repo_branch} && git checkout {exec_git_clone_op_repo_branch}; elif [ -n "{exec_git_clone_op_repo_pr}" ] && [ {exec_git_clone_op_repo_pr} -gt 0 ]; then git fetch origin pull/{exec_git_clone_op_repo_pr}/head:{exec_git_clone_op_repo_pr} && git checkout {exec_git_clone_op_repo_pr}; fi ' ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), kubernetes.client.V1Container( @@ -950,7 +1013,7 @@ def data_processing(train_args: TrainingArgs) -> None: python_main=exec_sdg_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -971,7 +1034,7 @@ def data_processing(train_args: TrainingArgs) -> None: python_main=exec_huggingface_importer_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -992,7 +1055,7 @@ def data_processing(train_args: TrainingArgs) -> None: python_main=exec_data_processing_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), ] @@ -1014,30 +1077,13 @@ def data_processing(train_args: TrainingArgs) -> None: name="copy-model-to-pvc", image=TOOLBOX_IMAGE, command=["/bin/sh", "-c"], - args=[f"cp -r -v {MODEL_PVC_MOUNT_PATH} {TRAINING_PVC_MOUNT_PATH}"], - volume_mounts=get_sdg_vol_mount(), + args=[ + f"cp -r -v {DATA_PVC_MOUNT_PATH} {DATA_PVC_MOUNT_PATH}" + ], # TODO: fix me, dumb line to pass linter, this feat is unused anyway + volume_mounts=get_vol_mount(), ) - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] + volumes = get_vol() # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( @@ -1070,6 +1116,7 @@ def create_sdg_data_fetch_job( namespace: str, job_name: str, sdg_object_store_secret: str, + force_pull: bool = False, ) -> kubernetes.client.V1Job: """ Create a Kubernetes Job object. @@ -1085,107 +1132,189 @@ def create_sdg_data_fetch_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ - container = kubernetes.client.V1Container( - name="fetch-sdg-files-from-object-store", - image=PYTHON_IMAGE, - command=["/bin/sh", "-c"], - args=[ - SDG_DATA_SCRIPT.format( - strategy="download", - MODEL_PVC_MOUNT_PATH=MODEL_PVC_MOUNT_PATH, # TODO: DOWNLOAD ON THE MODEL PVC!! + exec_data_processing_op_command = """ +from typing import * + +def data_processing_op( + sdg: str, + processed_data: str, + model: str, + max_seq_len: Optional[int] = 4096, + max_batch_len: Optional[int] = 20000, +): + import os + + import instructlab.training.data_process as dp + from instructlab.training import ( + DataProcessArgs, + TrainingArgs, + ) + + # define training-specific arguments + training_args = TrainingArgs( + # define data-specific arguments + model_path=model, + data_path=f"{sdg}/*_train_msgs*.jsonl", + data_output_dir=processed_data, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + def data_processing(train_args: TrainingArgs) -> None: + # early validation logic here + if train_args.max_batch_len < train_args.max_seq_len: + raise ValueError( + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" ) - ], - volume_mounts=get_fetch_sdg_vol_mount(), - env=[ - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ENDPOINT", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="endpoint", optional=True - ) - ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_BUCKET", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="bucket", optional=False - ) + + # process the training data + if not os.path.exists(train_args.data_output_dir): + os.makedirs(train_args.data_output_dir, exist_ok=True) + dp.main( + DataProcessArgs( + # XXX(osilkin): make a decision here, either: + # 1. the CLI is fully responsible for managing where the data is written + # 2. we never cache it and simply write it to a tmp file every time. + # + # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux + # where the user has a defined place for new temporary data to be written. + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + data_path=train_args.data_path, + max_seq_len=train_args.max_seq_len, + chat_tmpl_path=train_args.chat_tmpl_path, + ) + ) + + data_processing(train_args=training_args) +""" + exec_data_processing_op_args = """ +data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data") +""" + + init_containers = [ + kubernetes.client.V1Container( + name="fetch-sdg-files-from-object-store", + # image=PYTHON_IMAGE, + image="quay.io/opendatahub/workbench-images:jupyter-datascience-ubi9-python-3.11-20241004-609ffb8", + command=["/bin/sh", "-c"], + args=[ + DATA_SCRIPT.format( + strategy="download", + force_pull=force_pull, + data_pvc_mount_path=DATA_PVC_MOUNT_PATH, + ) + ], + volume_mounts=get_vol_mount(), + env=[ + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ENDPOINT", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="endpoint", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ACCESS_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="access_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_BUCKET", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="bucket", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_SECRET_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="secret_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ACCESS_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="access_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_REGION", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="region", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_SECRET_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="secret_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_DATA_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="data_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_REGION", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="region", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_MODEL_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="model_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_DATA_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="data_key", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_VERIFY_TLS", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="verify_tls", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_VERIFY_TLS", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="verify_tls", + optional=True, + ) + ), ), + ], + ) + ] + + container = kubernetes.client.V1Container( + name="sdg-op-generate-synthetic-data", + # image="quay.io/tcoufal/ilab-sdg:latest", + image="registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989", + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), ), ], - ) - - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=K8S_NAME) ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource(name=K8S_NAME) ), - ), - ] + ], + ) # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "sdg-data-fetch"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", + init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1256,8 +1385,8 @@ def run_mt_bench_op( models_path_prefix: str, mt_bench_output: Output[Artifact], merge_system_user_message: bool, - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 max_workers: str, models_list: List[str] = None, @@ -1297,8 +1426,8 @@ def run_mt_bench_op( scores = {} all_mt_bench_data = [] - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 if max_workers == "auto": try: @@ -1358,7 +1487,7 @@ def run_mt_bench_op( return outputs(best_model=best_model, best_score=best_score) """ exec_run_mt_bench_op_args = """ -run_mt_bench_op(mt_bench_output="/output/mt-bench-results.txt", models_list="/output/model/model/hf_format", models_path_prefix="/output/model/hf_format", max_workers="auto", merge_system_user_message=False) +run_mt_bench_op(mt_bench_output="/data/mt-bench-results.txt", models_folder="/data/model/output/hf_format", models_path_prefix="/data/model/output/hf_format", max_workers="auto", merge_system_user_message=False) """ if eval_type == "mt-bench": @@ -1373,11 +1502,7 @@ def run_mt_bench_op( python_main=exec_run_mt_bench_op_args.strip(), ), ], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + volume_mounts=get_vol_mount(), env_from=[ kubernetes.client.V1EnvFromSource( config_map_ref=kubernetes.client.V1ConfigMapEnvSource( @@ -1397,24 +1522,11 @@ def run_mt_bench_op( image="quay.io/sallyom/instructlab-ocp:eval-10-8", command=["/bin/sh", "-c"], args=[f"cat {MT_BENCH_SCORES_PATH}"], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + volume_mounts=get_vol_mount(), ) else: raise ValueError(f"Unknown evaluation type: {eval_type}") - volumes = [ - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] - # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "eval"}), @@ -1422,7 +1534,7 @@ def run_mt_bench_op( restart_policy="Never", init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1478,6 +1590,7 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: # Wait for the job to complete w = kubernetes.watch.Watch() + pod_log = None for event in w.stream(batch_v1.list_namespaced_job, namespace=namespace): job_event = event["object"] if job_event.metadata.name != job.metadata.name: @@ -1491,6 +1604,8 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: job.spec.template.metadata.labels["app"] ), ) + # On success return the logs of the last pod which contains the output + # (useful to get eval scores) pod_log = core_v1.read_namespaced_pod_log( name=pods.items[0].metadata.name, namespace=namespace ) @@ -1603,25 +1718,11 @@ def sdg( # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", }, ] for pvc in pvcs: @@ -1629,7 +1730,7 @@ def sdg( v1.create_namespaced_persistent_volume_claim( namespace=namespace, body=create_pvc(**pvc) ) - logger.info("Successfully creayed PVC '%s' created.", pvc.get("name")) + logger.info("Successfully created PVC '%s' created.", pvc.get("name")) except kubernetes.client.rest.ApiException as exc: if exc.status == 409: logger.info("PVC '%s' already exists.", pvc["name"]) @@ -1714,6 +1815,7 @@ def sdg_data_fetch( sdg_object_store_data_key = ctx.obj["sdg_object_store_data_key"] sdg_object_store_verify_tls = ctx.obj["sdg_object_store_verify_tls"] sdg_object_store_secret = ctx.obj["sdg_object_store_secret"] + force_pull = ctx.obj["force_pull"] # Make sure the endpoint is a valid URL validate_url(eval_serving_endpoint) @@ -1820,6 +1922,7 @@ def decode_base64(data): eval_serving_endpoint=eval_serving_endpoint, eval_serving_model_name=eval_serving_model_name, eval_serving_model_api_key=eval_serving_model_api_key, + eval_serving_name=EVAL_SERVING_NAME, ) ) ) @@ -1843,18 +1946,11 @@ def decode_base64(data): # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "10Gi", # SDG Data set can be big so let's go with a safe size - }, - { - "name": MODEL_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteMany"], - "size": "100Gi", # Model can be big so let's go with a safe size + "size": "200Gi", # Allocate size for a few models and large SDG data sets }, ] for pvc in pvcs: @@ -1874,6 +1970,7 @@ def decode_base64(data): namespace=namespace, job_name="sdg-data-fetch", sdg_object_store_secret=sdg_object_store_secret, + force_pull=force_pull, ) # Run the job @@ -1893,27 +1990,35 @@ def train( training_phase = ctx.obj["training_phase"] path_to_model = ctx.obj["model_to_train"] nproc_per_node: int = ctx.obj["nproc_per_node"] + training_1_epoch_num: int = ctx.obj["training_1_epoch_num"] + training_2_epoch_num: int = ctx.obj["training_2_epoch_num"] if training_phase is None: raise ValueError("Training phase must be provided with --training-phase=[1|2]") # During the initial training if path_to_model is None: - path_to_model = "/input_model" + path_to_model = DATA_PVC_MODEL_PATH + + epoch_num = None + if training_phase == "1": + epoch_num = training_1_epoch_num + elif training_phase == "2": + epoch_num = training_2_epoch_num logger.info("Running multi-phased distributed training phase %s", training_phase) worker_replicas = PYTORCH_NNODES - 1 pytorch_training_job_yaml = yaml.safe_load( PYTORCH_TRAINING_JOB.format( - name="train-sdg", - model_pvc_name="model", - input_pvc_name="sdg-data", - output_pvc_name="training-data", + name=f"train-phase-{training_phase}", + data_pvc_name=DATA_PVC_NAME, path_to_model=path_to_model, nproc_per_node=nproc_per_node, - PYTORCH_NNODES=PYTORCH_NNODES, + nnodes=PYTORCH_NNODES, PYTORCH_IMAGE=PYTORCH_IMAGE, worker_replicas=worker_replicas, + epoch_num=epoch_num, + phase_num=training_phase, ) ) diff --git a/standalone/standalone.tpl b/standalone/standalone.tpl index 324a167b..33b8ee9e 100755 --- a/standalone/standalone.tpl +++ b/standalone/standalone.tpl @@ -26,6 +26,7 @@ import json import logging import typing from urllib.parse import urlparse +from os import path import click import kubernetes @@ -46,20 +47,17 @@ DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git" K8S_NAME = "kfp-model-server" TOOLBOX_IMAGE = "registry.access.redhat.com/ubi9/toolbox" PYTHON_IMAGE = "registry.access.redhat.com/ubi9/python-311:latest" -SDG_PVC_NAME = "sdg-data" -SDG_PVC_MOUNT_PATH = "/input_data" -SDG_VOLUME_NAME = "input-data" -MODEL_PVC_NAME = "model" -MODEL_PVC_MOUNT_PATH = "/input_model" -MODEL_VOLUME_NAME = "model" -TAXONOMY_PATH = SDG_PVC_MOUNT_PATH + "/taxonomy" -TRAINING_PVC_NAME = "training-data" -TRAINING_PVC_MOUNT_PATH = "/output" -TRAINING_VOLUME_NAME = "output" +DATA_PVC_NAME = "data" +DATA_PVC_MOUNT_PATH = "/data" +DATA_PVC_MODEL_PATH = path.join(DATA_PVC_MOUNT_PATH, "model") +DATA_VOLUME_NAME = "data" +TAXONOMY_PATH = path.join(DATA_PVC_MOUNT_PATH, "taxonomy") +DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") +DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PYTORCH_NNODES = 2 -PYTORCH_IMAGE = "quay.io/shanand/test-train:0.0.4" +PYTORCH_IMAGE = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989" # MMLU_SCORES_PATH = "/output/mmlu-results.txt" -MT_BENCH_SCORES_PATH = "/output/mt-bench-results.txt" +MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") SDG_OBJECT_STORE_SECRET_NAME = "sdg-object-store-credentials" KFP_MODEL_SERVER_CM = """ # TODO: remove the following line and replace it with the actual ConfigMap/Secret @@ -71,7 +69,7 @@ EVAL_SERVING_DETAILS = """ kind: ConfigMap apiVersion: v1 metadata: - name: {EVAL_SERVING_NAME} + name: {eval_serving_name} data: endpoint: {eval_serving_endpoint} model: {eval_serving_model_name} @@ -79,7 +77,7 @@ data: apiVersion: v1 kind: Secret metadata: - name: {EVAL_SERVING_NAME} + name: {eval_serving_name} type: Opaque stringData: api_key: {eval_serving_model_api_key} @@ -104,9 +102,38 @@ spec: containers: - args: - | - mkdir -p /output/model; - mkdir -p /output/data; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /output/model --data_output_dir /input_data/processed_data + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" + mkdir -p /data/model; + mkdir -p /data/data; + mkdir -p {path_to_model}/output + export XDG_CACHE_HOME=/tmp + export TRITON_CACHE_DIR=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir={path_to_model}/output \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=1e-4 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' @@ -114,17 +141,11 @@ spec: image: {PYTORCH_IMAGE} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -135,15 +156,9 @@ spec: cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data + - name: data persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} Worker: replicas: {worker_replicas} restartPolicy: OnFailure @@ -155,8 +170,36 @@ spec: containers: - args: - | + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" mkdir -p /tmp/model; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /tmp/model --data_output_dir /input_data/processed_data + export TRITON_CACHE_DIR=/tmp + export XDG_CACHE_HOME=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir=/tmp/model \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=2e-6 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' @@ -164,18 +207,11 @@ spec: image: {PYTORCH_IMAGE} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -186,20 +222,26 @@ spec: cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model + - name: data persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} """ # TODO: support signature version? -SDG_DATA_SCRIPT = """ +DATA_SCRIPT = """ set -e +FORCE_PULL={force_pull} +if [ -s {data_pvc_mount_path}/data.tar.gz ] && [ -d {data_pvc_mount_path}/data ] && [ -d {data_pvc_mount_path}/model ] ; then + echo "Data tarball and sdg/model directories already exist in the PVC. Skipping download." + if [ "$FORCE_PULL" == "None" ] || [ "$FORCE_PULL" == "False" ]; then + echo "'--force-pull' is not set - will not force pull the data from the object store" + ls -laR {data_pvc_mount_path} + exit 0 + else + echo "'--force-pull' is set to true - will force pull the data from the object store" + fi +fi + export STRATEGY={strategy} if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then @@ -242,7 +284,7 @@ def download_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') - output_file = '{MODEL_PVC_MOUNT_PATH}/data.tar.gz' + output_file = '{data_pvc_mount_path}/data.tar.gz' s3.download_file(bucket_name, s3_key, output_file) @@ -251,7 +293,7 @@ def upload_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name - input_file = '{MODEL_PVC_MOUNT_PATH}/data.tar.gz' # TODO: change for model path + input_file = '{data_pvc_mount_path}/data.tar.gz' # TODO: change for model path s3.upload_file(input_file, bucket_name, s3_key) @@ -270,13 +312,10 @@ python "$tmp"/download_s3.py if [ "$STRATEGY" == "download" ]; then # List top-level directories only (no nested directories) - top_level_dirs=$(tar --exclude='*/*' --list --file {MODEL_PVC_MOUNT_PATH}/data.tar.gz) - - # List of directories we expect in the archive - expected_dirs=("data" "model") + top_level_dirs=$(tar --exclude='*/*' --list --file {data_pvc_mount_path}/data.tar.gz) # Loop through the expected directories and check if they exist in the archive - for dir in "${expected_dirs[@]}"; do + for dir in data model; do if ! echo "$top_level_dirs" | grep -q "^$dir/$"; then echo "Archive does not contain a '$dir' directory" exit 1 @@ -284,13 +323,9 @@ if [ "$STRATEGY" == "download" ]; then done echo "All expected directories are present." - # First extract SDG data in the SDG PVC - mkdir -p {SDG_PVC_MOUNT_PATH}/generated - tar -C {SDG_PVC_MOUNT_PATH}/generated -xf data.tar.gz --strip-components=1 data/ - - # Then extract the model in the model PVC - mkdir -p {MODEL_PVC_MOUNT_PATH}/model - tar -C {MODEL_PVC_MOUNT_PATH} -xf {MODEL_PVC_MOUNT_PATH}/data.tar.gz --strip-components=1 model/ + echo "Extracting data from the archive" + tar -C {data_pvc_mount_path} -xvf {data_pvc_mount_path}/data.tar.gz + ls -laR {data_pvc_mount_path} fi """ @@ -329,6 +364,7 @@ spec: PYTHON_EXECUTOR = """ set -e +export XDG_CACHE_HOME=/tmp tmp=$(mktemp -d) cat < "$tmp"/exec.py @@ -447,9 +483,7 @@ def show( @cli.group(invoke_without_command=True) -@click.option( - "--namespace", type=str, default="default", help="Kubernetes namespace to use" -) +@click.option("--namespace", type=str, help="Kubernetes namespace to use") @click.option( "--taxonomy-repo-url", type=str, @@ -529,7 +563,11 @@ def show( ) @click.option( "--model-to-train", - help="Path to model to train (PVC filesystem path)", + help=( + "Path to model to train (PVC filesystem path). " + "Useful when calling training phases independently and users wants to point to the epoch directory. " + "Very advanced usage, not recommended for general use." + ), type=str, ) @click.option( @@ -599,10 +637,24 @@ def show( ), type=str, ) +@click.option( + "--force-pull", + help="Force pull the data (sdg data and model) from the object store even if it already exists in the PVC.", + is_flag=True, + default=False, +) +@click.option( + "--training-1-epoch-num", help="Number of epochs to train the model for.", default=7 +) +@click.option( + "--training-2-epoch-num", + help="Number of epochs to train the model for.", + default=10, +) @click.pass_context def run( ctx: click.Context, - namespace: typing.Optional[str] = "default", + namespace: typing.Optional[str] = None, taxonomy_repo_url: str = "", taxonomy_repo_branch: typing.Optional[str] = "", taxonomy_repo_pr: typing.Optional[str] = "", @@ -624,6 +676,9 @@ def run( sdg_object_store_data_key: typing.Optional[str] = None, sdg_object_store_verify_tls: typing.Optional[bool] = None, sdg_object_store_secret: typing.Optional[str] = None, + force_pull: typing.Optional[bool] = False, + training_1_epoch_num: int = 7, + training_2_epoch_num: int = 10, ): """ Execute the distributed training on Kubernetes. @@ -651,6 +706,9 @@ def run( sdg_object_store_data_key (str): The name of the tarball that contains SDG data. sdg_object_store_verify_tls (bool): Verify TLS for the object store. sdg_object_store_secret (str): The name of the Kubernetes Secret containing the SDG object store credentials. The namespace is inferred from the namespace option. + force_pull (bool): Force pull the data (sdg data and model) from the object store even if it already exists in the PVC. + training_1_epoch_num (int): Number of epochs to train the model for during phase 1. + training_2_epoch_num (int): Number of epochs to train the model for during phase 2. Returns: None @@ -678,6 +736,9 @@ def run( ctx.obj["sdg_object_store_data_key"] = sdg_object_store_data_key ctx.obj["sdg_object_store_verify_tls"] = sdg_object_store_verify_tls ctx.obj["sdg_object_store_secret"] = sdg_object_store_secret + ctx.obj["force_pull"] = force_pull + ctx.obj["training_1_epoch_num"] = training_1_epoch_num + ctx.obj["training_2_epoch_num"] = training_2_epoch_num ########################## # MAIN WORKFLOW SEQUENCE # @@ -707,11 +768,19 @@ def run( # ctx.obj["model_to_train"] = best_model.get("model") # Training Phase 2 - # ctx.invoke(train) + ctx.obj["training_phase"] = "2" + ctx.invoke(train) # Evaluation of phase 2 with MT-Bench - # ctx.obj["eval_type"] = "mt-bench" - # _ = ctx.invoke(evaluation) + ctx.obj["eval_type"] = "mt-bench" + scores = ctx.invoke(evaluation) + scores = json.loads(scores) + best_model = max(scores, key=lambda x: x["average_score"]) + logger.info("Best model: %s", best_model.get("model")) + ctx.obj["candidate_model"] = best_model.get("model") + + # Final evaluation + # TODO def get_security_context() -> kubernetes.client.V1SecurityContext: @@ -724,33 +793,27 @@ def get_security_context() -> kubernetes.client.V1SecurityContext: ) -def get_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol_mount() -> list[kubernetes.client.V1VolumeMount]: """ Get the volume mount for the SDG job. """ return [ kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + name=DATA_VOLUME_NAME, mount_path=DATA_PVC_MOUNT_PATH ), ] -def get_fetch_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol() -> list[kubernetes.client.V1Volume]: """ - Get the volume mount for the SDG job. + Get the volume for the SDG job. """ return [ - kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH + kubernetes.client.V1Volume( + name=DATA_VOLUME_NAME, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=DATA_PVC_NAME + ), ), ] @@ -812,7 +875,7 @@ def create_sdg_job( image="{{exec_git_clone_op_image}}", command=["/bin/sh", "-c"], args={{exec_git_clone_op_args}}, - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), kubernetes.client.V1Container( @@ -826,7 +889,7 @@ def create_sdg_job( python_main=exec_sdg_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -847,7 +910,7 @@ def create_sdg_job( python_main=exec_huggingface_importer_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -868,7 +931,7 @@ def create_sdg_job( python_main=exec_data_processing_op_args.strip(), ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), ] @@ -890,30 +953,13 @@ def create_sdg_job( name="copy-model-to-pvc", image=TOOLBOX_IMAGE, command=["/bin/sh", "-c"], - args=[f"cp -r -v {MODEL_PVC_MOUNT_PATH} {TRAINING_PVC_MOUNT_PATH}"], - volume_mounts=get_sdg_vol_mount(), + args=[ + f"cp -r -v {DATA_PVC_MOUNT_PATH} {DATA_PVC_MOUNT_PATH}" + ], # TODO: fix me, dumb line to pass linter, this feat is unused anyway + volume_mounts=get_vol_mount(), ) - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] + volumes = get_vol() # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( @@ -946,6 +992,7 @@ def create_sdg_data_fetch_job( namespace: str, job_name: str, sdg_object_store_secret: str, + force_pull: bool = False, ) -> kubernetes.client.V1Job: """ Create a Kubernetes Job object. @@ -961,107 +1008,125 @@ def create_sdg_data_fetch_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ - container = kubernetes.client.V1Container( - name="fetch-sdg-files-from-object-store", - image=PYTHON_IMAGE, - command=["/bin/sh", "-c"], - args=[ - SDG_DATA_SCRIPT.format( - strategy="download", - MODEL_PVC_MOUNT_PATH=MODEL_PVC_MOUNT_PATH, # TODO: DOWNLOAD ON THE MODEL PVC!! - ) - ], - volume_mounts=get_fetch_sdg_vol_mount(), - env=[ - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ENDPOINT", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="endpoint", optional=True - ) - ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_BUCKET", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="bucket", optional=False - ) + exec_data_processing_op_command = """ +{{exec_data_processing_op_command}} +""" + exec_data_processing_op_args = """ +{{exec_data_processing_op_args}} +""" + + init_containers = [ + kubernetes.client.V1Container( + name="fetch-sdg-files-from-object-store", + # image=PYTHON_IMAGE, + image="quay.io/opendatahub/workbench-images:jupyter-datascience-ubi9-python-3.11-20241004-609ffb8", + command=["/bin/sh", "-c"], + args=[ + DATA_SCRIPT.format( + strategy="download", + force_pull=force_pull, + data_pvc_mount_path=DATA_PVC_MOUNT_PATH, + ) + ], + volume_mounts=get_vol_mount(), + env=[ + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ENDPOINT", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="endpoint", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ACCESS_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="access_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_BUCKET", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="bucket", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_SECRET_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="secret_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ACCESS_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="access_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_REGION", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="region", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_SECRET_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="secret_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_DATA_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="data_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_REGION", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="region", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_MODEL_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="model_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_DATA_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="data_key", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_VERIFY_TLS", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="verify_tls", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_VERIFY_TLS", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="verify_tls", + optional=True, + ) + ), ), + ], + ) + ] + + container = kubernetes.client.V1Container( + name="sdg-op-generate-synthetic-data", + # image="{{exec_sdg_op_image}}", + image="registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989", + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), ), ], - ) - - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=K8S_NAME) ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource(name=K8S_NAME) ), - ), - ] + ], + ) # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "sdg-data-fetch"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", + init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1144,11 +1209,7 @@ def create_eval_job( python_main=exec_run_mt_bench_op_args.strip(), ), ], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + volume_mounts=get_vol_mount(), env_from=[ kubernetes.client.V1EnvFromSource( config_map_ref=kubernetes.client.V1ConfigMapEnvSource( @@ -1168,24 +1229,11 @@ def create_eval_job( image="{{exec_run_mt_bench_op_image}}", command=["/bin/sh", "-c"], args=[f"cat {MT_BENCH_SCORES_PATH}"], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + volume_mounts=get_vol_mount(), ) else: raise ValueError(f"Unknown evaluation type: {eval_type}") - volumes = [ - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] - # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "eval"}), @@ -1193,7 +1241,7 @@ def create_eval_job( restart_policy="Never", init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1249,6 +1297,7 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: # Wait for the job to complete w = kubernetes.watch.Watch() + pod_log = None for event in w.stream(batch_v1.list_namespaced_job, namespace=namespace): job_event = event["object"] if job_event.metadata.name != job.metadata.name: @@ -1262,6 +1311,8 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: job.spec.template.metadata.labels["app"] ), ) + # On success return the logs of the last pod which contains the output + # (useful to get eval scores) pod_log = core_v1.read_namespaced_pod_log( name=pods.items[0].metadata.name, namespace=namespace ) @@ -1374,25 +1425,11 @@ def sdg( # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", }, ] for pvc in pvcs: @@ -1400,7 +1437,7 @@ def sdg( v1.create_namespaced_persistent_volume_claim( namespace=namespace, body=create_pvc(**pvc) ) - logger.info("Successfully creayed PVC '%s' created.", pvc.get("name")) + logger.info("Successfully created PVC '%s' created.", pvc.get("name")) except kubernetes.client.rest.ApiException as exc: if exc.status == 409: logger.info("PVC '%s' already exists.", pvc["name"]) @@ -1485,6 +1522,7 @@ def sdg_data_fetch( sdg_object_store_data_key = ctx.obj["sdg_object_store_data_key"] sdg_object_store_verify_tls = ctx.obj["sdg_object_store_verify_tls"] sdg_object_store_secret = ctx.obj["sdg_object_store_secret"] + force_pull = ctx.obj["force_pull"] # Make sure the endpoint is a valid URL validate_url(eval_serving_endpoint) @@ -1591,6 +1629,7 @@ def sdg_data_fetch( eval_serving_endpoint=eval_serving_endpoint, eval_serving_model_name=eval_serving_model_name, eval_serving_model_api_key=eval_serving_model_api_key, + eval_serving_name=EVAL_SERVING_NAME, ) ) ) @@ -1614,18 +1653,11 @@ def sdg_data_fetch( # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "10Gi", # SDG Data set can be big so let's go with a safe size - }, - { - "name": MODEL_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteMany"], - "size": "100Gi", # Model can be big so let's go with a safe size + "size": "200Gi", # Allocate size for a few models and large SDG data sets }, ] for pvc in pvcs: @@ -1645,6 +1677,7 @@ def sdg_data_fetch( namespace=namespace, job_name="sdg-data-fetch", sdg_object_store_secret=sdg_object_store_secret, + force_pull=force_pull, ) # Run the job @@ -1664,27 +1697,35 @@ def train( training_phase = ctx.obj["training_phase"] path_to_model = ctx.obj["model_to_train"] nproc_per_node: int = ctx.obj["nproc_per_node"] + training_1_epoch_num: int = ctx.obj["training_1_epoch_num"] + training_2_epoch_num: int = ctx.obj["training_2_epoch_num"] if training_phase is None: raise ValueError("Training phase must be provided with --training-phase=[1|2]") # During the initial training if path_to_model is None: - path_to_model = "/input_model" + path_to_model = DATA_PVC_MODEL_PATH + + epoch_num = None + if training_phase == "1": + epoch_num = training_1_epoch_num + elif training_phase == "2": + epoch_num = training_2_epoch_num logger.info("Running multi-phased distributed training phase %s", training_phase) worker_replicas = PYTORCH_NNODES - 1 pytorch_training_job_yaml = yaml.safe_load( PYTORCH_TRAINING_JOB.format( - name="train-sdg", - model_pvc_name="model", - input_pvc_name="sdg-data", - output_pvc_name="training-data", + name=f"train-phase-{training_phase}", + data_pvc_name=DATA_PVC_NAME, path_to_model=path_to_model, nproc_per_node=nproc_per_node, - PYTORCH_NNODES=PYTORCH_NNODES, + nnodes=PYTORCH_NNODES, PYTORCH_IMAGE=PYTORCH_IMAGE, worker_replicas=worker_replicas, + epoch_num=epoch_num, + phase_num=training_phase, ) ) diff --git a/training/components.py b/training/components.py index f1239d46..d178a04d 100644 --- a/training/components.py +++ b/training/components.py @@ -54,7 +54,7 @@ def data_processing(train_args: TrainingArgs) -> None: # early validation logic here if train_args.max_batch_len < train_args.max_seq_len: raise ValueError( - f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" ) # process the training data diff --git a/utils/helpers/helpers.py b/utils/helpers/helpers.py index 326fafe8..1e81f20a 100644 --- a/utils/helpers/helpers.py +++ b/utils/helpers/helpers.py @@ -51,7 +51,7 @@ def launch_vllm(model_path: str, gpu_count: int, retries: int = 60, delay: int = # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn't work -# Also, the base image does not include `pkill` cmd, so can't pkill -f vllm.entrypoints.openai.api_server either +# Also, the base image does not include 'pkill' cmd, so can't pkill -f vllm.entrypoints.openai.api_server either def stop_vllm(): import psutil