diff --git a/.github/workflows/pre_commit.yaml b/.github/workflows/pre_commit.yaml index 15c591dd..cf7d996e 100644 --- a/.github/workflows/pre_commit.yaml +++ b/.github/workflows/pre_commit.yaml @@ -34,3 +34,9 @@ jobs: - name: Run pre-commit run: | pre-commit run --all-files + + - name: Test if pipeline is up-to-date + run: | + pip install click kfp==2.9.0 kfp.kubernetes + make pipeline + git diff --exit-code || (echo "Pipeline is not up-to-date. Please run 'make pipeline' and commit the changes." && exit 1) diff --git a/Makefile b/Makefile index 769cf6dd..8a1a5695 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ -.PHONY: standalone +.PHONY: standalone pipeline standalone: python3 pipeline.py gen-standalone ruff format standalone/standalone.py + +pipeline: + python3 pipeline.py diff --git a/eval/final/components.py b/eval/final/components.py index e4d9036d..7a650edf 100644 --- a/eval/final/components.py +++ b/eval/final/components.py @@ -17,7 +17,6 @@ def run_final_eval_op( mmlu_branch_output: Output[Artifact], mt_bench_branch_output: Output[Artifact], - candidate_model: str, base_model_dir: str, tasks: Input[Dataset], taxonomy: Input[Dataset], @@ -29,6 +28,7 @@ def run_final_eval_op( few_shots: int, batch_size: int, merge_system_user_message: bool, + candidate_model: str = None, ): import json import os @@ -43,6 +43,11 @@ def run_final_eval_op( from instructlab.eval.mt_bench import MTBenchBranchEvaluator from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score + # For standalone mode + if candidate_model is None: + # logic to get the best model from the models folder and results + pass + ###################################################################### # branch_eval_summary_to_json creates a json object from output of instructlab/eval # TODO: Add this to the instructlab/eval or instructlab/instructlab repository @@ -221,7 +226,7 @@ def find_node_dataset_directories(base_directory: str): ###################################################################### # TODO: Update ilab/model/evaluate evaluate def logic to allow for external judge model - # and when that happens, much of this logic can be imported from the `evaluate` definition: + # and when that happens, much of this logic can be imported from the 'evaluate' definition: # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504 # # With instructlab, model_name is synonomous with model_path @@ -244,8 +249,8 @@ def find_node_dataset_directories(base_directory: str): ), ] - # ilab/evaluate uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # ilab/evaluate uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 if max_workers == "auto": try: diff --git a/eval/mt_bench/components.py b/eval/mt_bench/components.py index 429f4b2a..8f853f9a 100644 --- a/eval/mt_bench/components.py +++ b/eval/mt_bench/components.py @@ -12,8 +12,8 @@ def run_mt_bench_op( models_path_prefix: str, mt_bench_output: Output[Artifact], merge_system_user_message: bool, - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 max_workers: str, models_list: List[str] = None, @@ -24,13 +24,93 @@ def run_mt_bench_op( import os import torch - from helpers import ( - VLLM_SERVER, - launch_vllm, - stop_vllm, - ) from instructlab.eval.mt_bench import MTBenchEvaluator + VLLM_SERVER = "http://localhost:8000/v1" + + def launch_vllm( + model_path: str, gpu_count: int, retries: int = 120, delay: int = 5 + ): + import subprocess + import sys + import time + + import requests + + if gpu_count > 0: + command = [ + sys.executable, + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, + "--tensor-parallel-size", + str(gpu_count), + ] + else: + command = [ + sys.executable, + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, + ] + + subprocess.Popen(args=command) + + print(f"Waiting for vLLM server to start at {VLLM_SERVER}...") + + for attempt in range(retries): + try: + response = requests.get(f"{VLLM_SERVER}/models") + if response.status_code == 200: + print(f"vLLM server is up and running at {VLLM_SERVER}.") + return + except requests.ConnectionError: + pass + + print( + f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})..." + ) + time.sleep(delay) + + raise RuntimeError( + f"Failed to start vLLM server at {VLLM_SERVER} after {retries} retries." + ) + + # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn't work + # Also, the base image does not include 'pkill' cmd, so can't pkill -f vllm.entrypoints.openai.api_server either + def stop_vllm(): + import psutil + + for process in psutil.process_iter(attrs=["pid", "name", "cmdline"]): + cmdline = process.info.get("cmdline") + if cmdline and "vllm.entrypoints.openai.api_server" in cmdline: + print( + f"Found vLLM server process with PID: {process.info['pid']}, terminating..." + ) + try: + process.terminate() # Try graceful termination + process.wait(timeout=5) # Wait a bit for it to terminate + if process.is_running(): + print( + f"Forcefully killing vLLM server process with PID: {process.info['pid']}" + ) + process.kill() # Force kill if it's still running + print( + f"Successfully stopped vLLM server with PID: {process.info['pid']}" + ) + except psutil.NoSuchProcess: + print(f"Process with PID {process.info['pid']} no longer exists.") + except psutil.AccessDenied: + print( + f"Access denied when trying to terminate process with PID {process.info['pid']}." + ) + except Exception as e: + print( + f"Failed to terminate process with PID {process.info['pid']}. Error: {e}" + ) + os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" gpu_available = torch.cuda.is_available() @@ -53,8 +133,8 @@ def run_mt_bench_op( scores = {} all_mt_bench_data = [] - # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto` - # with `auto`, number of gpus allocated for serving is calculated based on environment + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 if max_workers == "auto": try: diff --git a/pipeline.py b/pipeline.py index 1f8d12e0..cfbfe585 100644 --- a/pipeline.py +++ b/pipeline.py @@ -348,7 +348,7 @@ def pipeline( final_eval_task.set_accelerator_type("nvidia.com/gpu") final_eval_task.set_accelerator_limit(1) - # Technically `output_model_task` and `output_data_task` can happen before evaluation, + # Technically 'output_model_task' and 'output_data_task' can happen before evaluation, # however the PVC can only be mounted once, so, setting these to _after_ so the eval proceeds. output_model_task = pvc_to_artifact_op( pvc_path="/output/data", @@ -417,7 +417,7 @@ def gen_standalone(): This function should be used when Kubeflow Pipelines are not available. It will generate a script that replicates the pipeline's functionality. - Example usage: ``` $ python pipeline.py gen-standalone ``` + Example usage: ''' $ python pipeline.py gen-standalone ''' """ from os import path @@ -442,145 +442,11 @@ def gen_standalone(): # The list of executor names to extract details from to generate the standalone script executors = { - "exec-data-processing-op": { - "inputs": { - "parameterValues": { - "max_seq_len": 4096, - "max_batch_len": 20000, - }, - "artifacts": { - "sdg": { - "artifacts": [ - { - "name": "sdg", - "uri": "/input_data/generated", # TODO: do not hardcode!! - } - ] - }, - "model": { - "artifacts": [ - { - "name": "model", - "uri": "/input_model", # TODO: do not hardcode!! - } - ] - }, - }, - }, - "outputs": { - "outputFile": "/tmp/kfp_outputs/output_metadata.json", - "artifacts": { - "processed_data": { - "artifacts": [ - { - "name": "processed_data", - "uri": "/input_data/processed_data", # TODO: do not hardcode!! - } - ] - }, - }, - }, - }, - "exec-sdg-op": { - "inputs": { - "parameterValues": { - "num_instructions_to_generate": 2, - "repo_branch": "", - "repo_pr": "", - }, - "artifacts": { - "taxonomy": { - "artifacts": [ - { - "name": "taxonomy", - "uri": "/input_data/taxonomy", # TODO: do not hardcode!! - } - ] - } - }, - }, - "outputs": { - "outputFile": "/tmp/kfp_outputs/output_metadata.json", - "artifacts": { - "sdg": { - "artifacts": [ - { - "name": "sdg", - "uri": "/input_data/generated", # TODO: do not hardcode!! - } - ] - }, - }, - }, - }, + "exec-data-processing-op": 'data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data")', + "exec-sdg-op": 'sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/data/taxonomy", sdg="/data/generated")', "exec-git-clone-op": {}, - "exec-huggingface-importer-op": { - "inputs": { - "parameterValues": { - "repo_name": BASE_MODE, - }, - }, - "outputs": { - "outputFile": "/tmp/kfp_outputs/output_metadata.json", - "artifacts": { - "model": { - "artifacts": [ - { - "name": "model", - "uri": "/input_model", # TODO: do not hardcode!! - } - ] - }, - }, - }, - }, - "exec-run-mmlu-op": { - "inputs": { - "parameterValues": { - "models_path_prefix": "/output/model/hf_format", - "mmlu_tasks_list": MMLU_TASKS_LIST, - "model_dtype": MODEL_DTYPE, - "few_shots": FEW_SHOTS, - "batch_size": BATCH_SIZE, - "models_folder": "/output/model/hf_format", - }, - }, - "outputs": { - "outputFile": "/tmp/kfp_outputs/output_metadata.json", - "artifacts": { - "mmlu_output": { - "artifacts": [ - { - "name": "mmlu_output", - "uri": "/output/mmlu-results.txt", # TODO: do not hardcode!! - } - ] - }, - }, - }, - }, - "exec-run-mt-bench-op": { - "inputs": { - "parameterValues": { - "models_path_prefix": "/output/model/hf_format", - "merge_system_user_message": MERGE_SYSTEM_USER_MESSAGE, - "max_workers": MAX_WORKERS, - }, - }, - "outputs": { - "outputFile": "/tmp/kfp_outputs/output_metadata.json", - "artifacts": { - "mt_bench_output": { - "artifacts": [ - { - "name": "mt_bench_output", - "uri": "/output/mt-bench-results.txt", # TODO: do not hardcode!! - } - ] - }, - }, - }, - }, + "exec-huggingface-importer-op": 'huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/data/model")', + "exec-run-mt-bench-op": 'run_mt_bench_op(mt_bench_output="/data/mt-bench-results.txt", models_folder="/data/model/output/hf_format", models_path_prefix="/data/model/output/hf_format", max_workers="auto", merge_system_user_message=False)', } details = {} @@ -591,14 +457,18 @@ def gen_standalone(): executor_details = get_executor_details(documents, executor_name) if executor_details is not None: details[executor_name_camelize + "_image"] = executor_details["image"] - details[executor_name_camelize + "_command"] = executor_details[ - "command" - ] - details[executor_name_camelize + "_args"] = remove_template_markers( - executor_details["args"], - executor_name_camelize, - executor_input_param, + details[executor_name_camelize + "_command"] = ( + change_dsl_function_to_normal_function(executor_details["command"]) ) + if executor_name == "exec-git-clone-op": + details[executor_name_camelize + "_args"] = remove_template_markers( + executor_details["args"], + executor_name_camelize, + executor_input_param, + ) + else: + details[executor_name_camelize + "_args"] = executor_input_param + except ValueError as e: click.echo(f"Error: {e}", err=True) raise click.exceptions.Exit(1) @@ -741,5 +611,33 @@ def remove_template_markers( return rendered_code +def change_dsl_function_to_normal_function(rendered_code: list): + replacements = { + "dsl.Input[dsl.Dataset]": "str", + "dsl.Input[dsl.Model]": "str", + "dsl.Input[dsl.Artifact]": "str", + "dsl.Output[dsl.Dataset]": "str", + "dsl.Output[dsl.Model]": "str", + "Output[Artifact]": "str", + "import kfp": "", + "from kfp import dsl": "", + "from kfp.dsl import *": "", + } + + import re + + # Regular expression to match ".path" but not "os.path" + path_pattern = re.compile(r"(? None:\n \ \ # early validation logic here\n if train_args.max_batch_len\ \ < train_args.max_seq_len:\n raise ValueError(\n \ - \ f\"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=}\ + \ f\"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=}\ \ < {train_args.max_seq_len=}\"\n )\n\n # process\ \ the training data\n if not os.path.exists(train_args.data_output_dir):\n\ \ os.makedirs(train_args.data_output_dir, exist_ok=True)\n \ @@ -1008,16 +1009,18 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef run_final_eval_op(\n mmlu_branch_output: Output[Artifact],\n\ - \ mt_bench_branch_output: Output[Artifact],\n candidate_model: str,\n\ - \ base_model_dir: str,\n tasks: Input[Dataset],\n taxonomy: Input[Dataset],\n\ - \ base_branch: str,\n candidate_branch: str,\n max_workers: str,\n\ - \ device: str,\n model_dtype: str,\n few_shots: int,\n batch_size:\ - \ int,\n merge_system_user_message: bool,\n):\n import json\n import\ + \ mt_bench_branch_output: Output[Artifact],\n base_model_dir: str,\n\ + \ tasks: Input[Dataset],\n taxonomy: Input[Dataset],\n base_branch:\ + \ str,\n candidate_branch: str,\n max_workers: str,\n device: str,\n\ + \ model_dtype: str,\n few_shots: int,\n batch_size: int,\n merge_system_user_message:\ + \ bool,\n candidate_model: str = None,\n):\n import json\n import\ \ os\n\n import torch\n from helpers import (\n VLLM_SERVER,\n\ \ launch_vllm,\n stop_vllm,\n )\n from instructlab.eval.mmlu\ \ import MMLU_TASKS, MMLUBranchEvaluator\n from instructlab.eval.mt_bench\ \ import MTBenchBranchEvaluator\n from instructlab.model.evaluate import\ - \ qa_pairs_to_qna_to_avg_scores, sort_score\n\n ######################################################################\n\ + \ qa_pairs_to_qna_to_avg_scores, sort_score\n\n # For standalone mode\n\ + \ if candidate_model is None:\n # logic to get the best model\ + \ from the models folder and results\n pass\n\n ######################################################################\n\ \ # branch_eval_summary_to_json creates a json object from output of\ \ instructlab/eval\n # TODO: Add this to the instructlab/eval or instructlab/instructlab\ \ repository\n def branch_eval_summary_to_json(\n improvements:\ @@ -1107,7 +1110,7 @@ deploymentSpec: main\"\n\n ######################################################################\n\ \ # TODO: Update ilab/model/evaluate evaluate def logic to allow for\ \ external judge model\n # and when that happens, much of this logic\ - \ can be imported from the `evaluate` definition:\n # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504\n\ + \ can be imported from the 'evaluate' definition:\n # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504\n\ \ #\n # With instructlab, model_name is synonomous with model_path\n\ \ mt_bench_evaluators = [\n MTBenchBranchEvaluator(\n \ \ model_name=candidate_model,\n judge_model_name=judge_model_name,\n\ @@ -1118,7 +1121,7 @@ deploymentSpec: \ branch=base_branch,\n output_dir=output_dir,\n \ \ merge_system_user_message=merge_system_user_message,\n \ \ ),\n ]\n\n # ilab/evaluate uses a magic word for its mt_bench\ - \ evaluator - `auto`\n # with `auto`, number of gpus allocated for serving\ + \ evaluator - 'auto'\n # with 'auto', number of gpus allocated for serving\ \ is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ \ = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n\ @@ -1197,15 +1200,59 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef run_mt_bench_op(\n models_path_prefix: str,\n mt_bench_output:\ \ Output[Artifact],\n merge_system_user_message: bool,\n # generate_answers,judgment\ - \ uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`,\ + \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ \ number of gpus allocated for serving is calculated based on environment\n\ \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ max_workers: str,\n models_list: List[str] = None,\n models_folder:\ \ Optional[str] = None,\n device: str = None,\n) -> NamedTuple(\"outputs\"\ , best_model=str, best_score=float):\n import json\n import os\n\n\ - \ import torch\n from helpers import (\n VLLM_SERVER,\n \ - \ launch_vllm,\n stop_vllm,\n )\n from instructlab.eval.mt_bench\ - \ import MTBenchEvaluator\n\n os.environ[\"PYTORCH_CUDA_ALLOC_CONF\"\ + \ import torch\n from instructlab.eval.mt_bench import MTBenchEvaluator\n\ + \n VLLM_SERVER = \"http://localhost:8000/v1\"\n\n def launch_vllm(\n\ + \ model_path: str, gpu_count: int, retries: int = 120, delay: int\ + \ = 5\n ):\n import subprocess\n import sys\n import\ + \ time\n\n import requests\n\n if gpu_count > 0:\n \ + \ command = [\n sys.executable,\n \"-m\"\ + ,\n \"vllm.entrypoints.openai.api_server\",\n \ + \ \"--model\",\n model_path,\n \"--tensor-parallel-size\"\ + ,\n str(gpu_count),\n ]\n else:\n \ + \ command = [\n sys.executable,\n \"\ + -m\",\n \"vllm.entrypoints.openai.api_server\",\n \ + \ \"--model\",\n model_path,\n ]\n\n \ + \ subprocess.Popen(args=command)\n\n print(f\"Waiting for vLLM\ + \ server to start at {VLLM_SERVER}...\")\n\n for attempt in range(retries):\n\ + \ try:\n response = requests.get(f\"{VLLM_SERVER}/models\"\ + )\n if response.status_code == 200:\n \ + \ print(f\"vLLM server is up and running at {VLLM_SERVER}.\")\n \ + \ return\n except requests.ConnectionError:\n \ + \ pass\n\n print(\n f\"Server not available\ + \ yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\ + \n )\n time.sleep(delay)\n\n raise RuntimeError(\n\ + \ f\"Failed to start vLLM server at {VLLM_SERVER} after {retries}\ + \ retries.\"\n )\n\n # This seems like excessive effort to stop\ + \ the vllm process, but merely saving & killing the pid doesn't work\n \ + \ # Also, the base image does not include 'pkill' cmd, so can't pkill\ + \ -f vllm.entrypoints.openai.api_server either\n def stop_vllm():\n \ + \ import psutil\n\n for process in psutil.process_iter(attrs=[\"\ + pid\", \"name\", \"cmdline\"]):\n cmdline = process.info.get(\"\ + cmdline\")\n if cmdline and \"vllm.entrypoints.openai.api_server\"\ + \ in cmdline:\n print(\n f\"Found vLLM\ + \ server process with PID: {process.info['pid']}, terminating...\"\n \ + \ )\n try:\n process.terminate()\ + \ # Try graceful termination\n process.wait(timeout=5)\ + \ # Wait a bit for it to terminate\n if process.is_running():\n\ + \ print(\n f\"Forcefully\ + \ killing vLLM server process with PID: {process.info['pid']}\"\n \ + \ )\n process.kill() # Force kill\ + \ if it's still running\n print(\n \ + \ f\"Successfully stopped vLLM server with PID: {process.info['pid']}\"\ + \n )\n except psutil.NoSuchProcess:\n\ + \ print(f\"Process with PID {process.info['pid']} no\ + \ longer exists.\")\n except psutil.AccessDenied:\n \ + \ print(\n f\"Access denied when trying\ + \ to terminate process with PID {process.info['pid']}.\"\n \ + \ )\n except Exception as e:\n print(\n\ + \ f\"Failed to terminate process with PID {process.info['pid']}.\ + \ Error: {e}\"\n )\n\n os.environ[\"PYTORCH_CUDA_ALLOC_CONF\"\ ] = \"expandable_segments:True\"\n\n gpu_available = torch.cuda.is_available()\n\ \ gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n\ \ if gpu_available\n else \"No GPU available\"\n )\n \ @@ -1215,7 +1262,7 @@ deploymentSpec: \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ )\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment\ - \ uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`,\ + \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ \ number of gpus allocated for serving is calculated based on environment\n\ \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ @@ -1286,7 +1333,7 @@ deploymentSpec: \ > 0) else \"empty\"\n\n print(\"Generating syntetic dataset for:\"\ )\n print()\n print(read_taxonomy(taxonomy.path, taxonomy_base))\n\ \n # generate_data has a magic word for its taxonomy_base argument -\ - \ `empty`\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n\ + \ 'empty'\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n\ \ generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n\ \ output_dir=sdg.path,\n taxonomy=taxonomy.path,\n \ \ taxonomy_base=taxonomy_base,\n model_name=model,\n chunk_word_count=1000,\n\ diff --git a/sdg/components.py b/sdg/components.py index 188aced1..d3607b6d 100644 --- a/sdg/components.py +++ b/sdg/components.py @@ -52,7 +52,7 @@ def sdg_op( print() print(read_taxonomy(taxonomy.path, taxonomy_base)) - # generate_data has a magic word for its taxonomy_base argument - `empty` + # generate_data has a magic word for its taxonomy_base argument - 'empty' # it allows generating from the whole repo, see: # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230 generate_data( diff --git a/standalone/README.md b/standalone/README.md index f4c351f4..f7c114f3 100644 --- a/standalone/README.md +++ b/standalone/README.md @@ -77,15 +77,32 @@ The script requires information regarding the location and method for accessing * `--namespace`: The namespace in which the Kubernetes resources are located - **Required** * `--storage-class`: The storage class to use for the PVCs - **Optional** - Default: cluster default storage class. * `--nproc-per-node`: The number of processes to run per node - **Optional** - Default: 1. -* `--sdg-object-store-secret`: The name of the Kubernetes secret containing the SDG object store credentials. -* `--sdg-object-store-endpoint`: The endpoint of the object store. `SDG_OBJECT_STORE_ENDPOINT` environment variable can be used as well. -* `--sdg-object-store-bucket`: The bucket name in the object store. `SDG_OBJECT_STORE_BUCKET` environment variable can be used as well. -* `--sdg-object-store-access-key`: The access key for the object store. `SDG_OBJECT_STORE_ACCESS_KEY` environment variable can be used as well. -* `--sdg-object-store-secret-key`: The secret key for the object store. `SDG_OBJECT_STORE_SECRET_KEY` environment variable can be used as well. -* `--sdg-object-store-data-key`: The key for the SDG data in the object store. e.g., `sdg.tar.gz`. `SDG_OBJECT_STORE_DATA_KEY` environment variable can be used as well. +* `--sdg-object-store-secret`: The name of the Kubernetes secret containing the SDG object store + credentials. **Optional** - If not provided, the script will expect the provided CLI options to fetch the SDG data. +* `--sdg-object-store-endpoint`: The endpoint of the object store. `SDG_OBJECT_STORE_ENDPOINT` + environment variable can be used as well. **Optional** +* `--sdg-object-store-bucket`: The bucket name in the object store. `SDG_OBJECT_STORE_BUCKET` + environment variable can be used as well. **Required** - If `--sdg-object-store-secret` is not provided. +* `--sdg-object-store-access-key`: The access key for the object store. + `SDG_OBJECT_STORE_ACCESS_KEY` environment variable can be used as well. **Required** - If `--sdg-object-store-secret` is not provided. +* `--sdg-object-store-secret-key`: The secret key for the object store. + `SDG_OBJECT_STORE_SECRET_KEY` environment variable can be used as well. **Required** - If `--sdg-object-store-secret` is not provided. +* `--sdg-object-store-data-key`: The key for the SDG data in the object store. e.g., + `data.tar.gz`.`SDG_OBJECT_STORE_DATA_KEY` environment variable can be used as well. **Required** - If `--sdg-object-store-secret` is not provided. * `--sdg-object-store-verify-tls`: Whether to verify TLS for the object store endpoint (default: - true). `SDG_OBJECT_STORE_VERIFY_TLS` environment variable can be used as well. -* `--sdg-object-store-region`: The region of the object store. `SDG_OBJECT_STORE_REGION` environment variable can be used as well. + true). `SDG_OBJECT_STORE_VERIFY_TLS` environment variable can be used as well. **Optional** +* `--sdg-object-store-region`: The region of the object store. `SDG_OBJECT_STORE_REGION` environment + variable can be used as well. **Optional** +* `--judge-serving-endpoint`: Serving endpoint for evaluation. e.g: + http://serving.kubeflow.svc.cluster.local:8080/v1 - **Required** +* `--judge-serving-model-name`: The name of the model to use for evaluation. **Required** +* `--judge-serving-model-api-key`: The API key for the model to evaluate. `JUDGE_SERVING_MODEL_API_KEY` + environment variable can be used as well. **Required** +* `--force-pull`: Force pull the data (sdg data and model) from the object store even if it already + exists in the PVC. **Optional** - Default: false. +* `--training-1-epoch-num`: The number of epochs to train the model for phase 1. **Optional** - Default: 7. +* `--training-2-epoch-num`: The number of epochs to train the model for phase 2. **Optional** - Default: 10. + ## Example End-To-End Workflow @@ -94,17 +111,21 @@ The script requires information regarding the location and method for accessing The following example demonstrates how to generate SDG data, package it as a tarball, and upload it to an object store. This assumes that AWS CLI is installed and configured with the necessary credentials. -In this scenario the name of the bucket is `sdg-data` and the tarball file is `sdg.tar.gz`. +In this scenario the name of the bucket is `sdg-data` and the tarball file is `data.tar.gz`. ```bash ilab data generate -cd generated -tar -czvf sdg.tar.gz * -aws cp sdg.tar.gz s3://sdg-data/sdg.tar.gz +mv generated data +tar -czvf data.tar.gz data model +aws cp data.tar.gz s3://sdg-data/data.tar.gz ``` > [!CAUTION] -> Ensures SDG data is packaged as a tarball **without** top-level directories. So you must run `tar` inside the directory containing the SDG data. +> Ensures SDG data are in a directory called "data" and the model is in a directory called "model". +> The tarball must contain two top-level directories: `data` and `model`. + +> [!CAUTION] +> Make sure the tarball format is .tar.gz. #### Alternative Method to AWS CLI @@ -116,7 +137,7 @@ to upload the SDG data to the object store. --object-store-bucket sdg-data \ --object-store-access-key $ACCESS_KEY \ --object-store-secret-key $SECRET_KEY \ - --sdg-data-archive-file-path sdg.tar.gz + --sdg-data-archive-file-path data.tar.gz ``` Run `./sdg-data-on-s3.py upload --help` to see all available options. @@ -127,7 +148,7 @@ The simplest method to supply the script with the required information for retri creating a Kubernetes secret. In the example below, we create a secret called `sdg-data` within the `my-namespace` namespace, containing the necessary credentials. Ensure that you update the access key and secret key as needed. The `data_key` field refers to the name of the tarball file in the -object store that holds the SDG data. In this case, it's named `sdg.tar.gz`, as we previously +object store that holds the SDG data. In this case, it's named `data.tar.gz`, as we previously uploaded the tarball to the object store using this name. ```bash @@ -142,10 +163,15 @@ stringData: bucket: sdg-data access_key: ***** secret_key: ***** - data_key: sdg.tar.gz + data_key: data.tar.gz EOF -./standalone run --namespace my-namespace --sdg-object-store-secret sdg-data +./standalone run \ + --namespace my-namespace \ + --judge-serving-endpoint http://serving.kubeflow.svc.cluster.local:8080/v1 \ + --judge-serving-model-name my-model \ + --judge-serving-model-api-key ***** \ + --sdg-object-store-secret sdg-data ``` > [!WARNING] @@ -162,6 +188,13 @@ The list of all supported keys: * `endpoint`: The endpoint of the object store, e.g: https://s3.openshift-storage.svc:443 - **Optional** * `region`: The region of the object store - **Optional** +> [!NOTE] +> The `--judge-serving-endpoint` and `--judge-serving-model-name` values will be stored in a ConfigMap +> named `judge-serving-details` in the same namespace as the resources that the script interacts +> with. (in this case, `my-namespace`) +> The `--judge-serving-model-api-key` value will be stored in a secret named `judge-serving-details` +> in the same namespace as the resources that the script interacts with. (in this case, `my-namespace`) + #### Running the Script Without Kubernetes Secret Alternatively, you can provide the necessary information directly via CLI options or environment, @@ -172,10 +205,13 @@ Secret named `sdg-object-store-credentials` in the same namespace as the resourc ```bash ./standalone run \ --namespace my-namespace \ + --judge-serving-endpoint http://serving.kubeflow.svc.cluster.local:8080/v1 \ + --judge-serving-model-name my-model \ + --judge-serving-model-api-key ***** \ --sdg-object-store-access-key key \ --sdg-object-store-secret-key key \ --sdg-object-store-bucket sdg-data \ - --sdg-object-store-data-key sdg.tar.gz + --sdg-object-store-data-key data.tar.gz ``` #### Advanced Configuration Using an S3-Compatible Object Store @@ -184,11 +220,14 @@ If you don't use the official AWS S3 endpoint, you can provide additional inform ```bash ./standalone run \ - --namespace foo \ + --namespace my-namespace \ + --judge-serving-endpoint http://serving.kubeflow.svc.cluster.local:8080/v1 \ + --judge-serving-model-name my-model \ + --judge-serving-model-api-key ***** \ --sdg-object-store-access-key key \ --sdg-object-store-secret-key key \ --sdg-object-store-bucket sdg-data \ - --sdg-object-store-data-key sdg.tar.gz \ + --sdg-object-store-data-key data.tar.gz \ --sdg-object-store-verify-tls false \ --sdg-object-store-endpoint https://s3.openshift-storage.svc:443 ``` diff --git a/standalone/standalone.py b/standalone/standalone.py index 9a91fc7d..f5113f75 100755 --- a/standalone/standalone.py +++ b/standalone/standalone.py @@ -25,6 +25,7 @@ import json import logging import typing +from os import path from urllib.parse import urlparse import click @@ -45,21 +46,18 @@ DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git" K8S_NAME = "kfp-model-server" TOOLBOX_IMAGE = "registry.access.redhat.com/ubi9/toolbox" -PYTHON_IMAGE = "registry.access.redhat.com/ubi9/python-311:latest" -SDG_PVC_NAME = "sdg-data" -SDG_PVC_MOUNT_PATH = "/input_data" -SDG_VOLUME_NAME = "input-data" -MODEL_PVC_NAME = "model" -MODEL_PVC_MOUNT_PATH = "/input_model" -MODEL_VOLUME_NAME = "model" -TAXONOMY_PATH = SDG_PVC_MOUNT_PATH + "/taxonomy" -TRAINING_PVC_NAME = "training-data" -TRAINING_PVC_MOUNT_PATH = "/output" -TRAINING_VOLUME_NAME = "output" +DS_IMAGE = "quay.io/opendatahub/workbench-images:jupyter-datascience-ubi9-python-3.11-20241004-609ffb8" +RHELAI_IMAGE = "registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.2" +DATA_PVC_NAME = "data" +DATA_PVC_MOUNT_PATH = "/data" +DATA_PVC_MODEL_PATH = path.join(DATA_PVC_MOUNT_PATH, "model") +DATA_VOLUME_NAME = "data" +TAXONOMY_PATH = path.join(DATA_PVC_MOUNT_PATH, "taxonomy") +DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") +DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PYTORCH_NNODES = 2 -PYTORCH_IMAGE = "quay.io/shanand/test-train:0.0.4" # MMLU_SCORES_PATH = "/output/mmlu-results.txt" -MT_BENCH_SCORES_PATH = "/output/mt-bench-results.txt" +MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") SDG_OBJECT_STORE_SECRET_NAME = "sdg-object-store-credentials" KFP_MODEL_SERVER_CM = """ # TODO: remove the following line and replace it with the actual ConfigMap/Secret @@ -81,6 +79,8 @@ """ +JUDGE_SERVING_NAME = "judge-serving-details" + PYTORCH_TRAINING_JOB = """ apiVersion: kubeflow.org/v1 kind: PyTorchJob @@ -100,27 +100,50 @@ containers: - args: - | - mkdir -p /output/model; - mkdir -p /output/data; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /output/model --data_output_dir /input_data/processed_data + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" + mkdir -p /data/model; + mkdir -p /data/data; + mkdir -p {path_to_model}/output + export XDG_CACHE_HOME=/tmp + export TRITON_CACHE_DIR=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir={path_to_model}/output \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=1e-4 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' - '--' - image: {PYTORCH_IMAGE} + image: {image} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -131,15 +154,9 @@ cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model + - name: data persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} Worker: replicas: {worker_replicas} restartPolicy: OnFailure @@ -151,27 +168,48 @@ containers: - args: - | + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" mkdir -p /tmp/model; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /tmp/model --data_output_dir /input_data/processed_data + export TRITON_CACHE_DIR=/tmp + export XDG_CACHE_HOME=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir=/tmp/model \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=2e-6 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' - '--' - image: {PYTORCH_IMAGE} + image: {image} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -182,20 +220,26 @@ cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model + - name: data persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} """ # TODO: support signature version? -SDG_DATA_SCRIPT = """ +DATA_SCRIPT = """ set -e +FORCE_PULL={force_pull} +if [ -s {data_pvc_mount_path}/data.tar.gz ] && [ -d {data_pvc_mount_path}/data ] && [ -d {data_pvc_mount_path}/model ] ; then + echo "Data tarball and sdg/model directories already exist in the PVC. Skipping download." + if [ "$FORCE_PULL" == "None" ] || [ "$FORCE_PULL" == "False" ]; then + echo "'--force-pull' is not set - will not force pull the data from the object store" + ls -laR {data_pvc_mount_path} + exit 0 + else + echo "'--force-pull' is set to true - will force pull the data from the object store" + fi +fi + export STRATEGY={strategy} if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then @@ -238,7 +282,7 @@ def download_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') - output_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' + output_file = '{data_pvc_mount_path}/data.tar.gz' s3.download_file(bucket_name, s3_key, output_file) @@ -247,7 +291,7 @@ def upload_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name - input_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' # TODO: change for model path + input_file = '{data_pvc_mount_path}/data.tar.gz' # TODO: change for model path s3.upload_file(input_file, bucket_name, s3_key) @@ -264,9 +308,22 @@ def upload_s3_file(): python "$tmp"/download_s3.py -if [[ "$STRATEGY" == "download" ]]; then - mkdir -p {SDG_PVC_MOUNT_PATH}/generated - tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated +if [ "$STRATEGY" == "download" ]; then + # List top-level directories only (no nested directories) + top_level_dirs=$(tar --exclude='*/*' --list --file {data_pvc_mount_path}/data.tar.gz) + + # Loop through the expected directories and check if they exist in the archive + for dir in data model; do + if ! echo "$top_level_dirs" | grep -q "^$dir/$"; then + echo "Archive does not contain a '$dir' directory" + exit 1 + fi + done + echo "All expected directories are present." + + echo "Extracting data from the archive" + tar -C {data_pvc_mount_path} -xvf {data_pvc_mount_path}/data.tar.gz + ls -laR {data_pvc_mount_path} fi """ @@ -303,6 +360,23 @@ def upload_s3_file(): name: {script_configmap} """ +PYTHON_EXECUTOR = """ +set -e +export XDG_CACHE_HOME=/tmp + +tmp=$(mktemp -d) +cat < "$tmp"/exec.py + +{python_code} + +if __name__ == "__main__": + {python_main} + +EOF + +python3 "$tmp"/exec.py +""" + @click.group() def cli(): @@ -407,9 +481,7 @@ def show( @cli.group(invoke_without_command=True) -@click.option( - "--namespace", type=str, default="default", help="Kubernetes namespace to use" -) +@click.option("--namespace", type=str, help="Kubernetes namespace to use") @click.option( "--taxonomy-repo-url", type=str, @@ -446,6 +518,30 @@ def show( help="Serving model for SDG - for SDG only", hidden=True, ) +@click.option( + "--judge-serving-endpoint", + type=str, + help=( + "Serving endpoint for evaluation." + "e.g. http://serving.kubeflow.svc.cluster.local:8080/v1" + ), + required=True, +) +@click.option( + "--judge-serving-model-name", + type=str, + help="The name of the model to use for evaluation.", + required=True, +) +@click.option( + "--judge-serving-model-api-key", + type=str, + help=( + "Serving model API key for evaluation. " "(JUDGE_SERVING_MODEL_API_KEY env var)" + ), + envvar="JUDGE_SERVING_MODEL_API_KEY", + required=True, +) @click.option( "--nproc-per-node", type=int, @@ -465,7 +561,11 @@ def show( ) @click.option( "--model-to-train", - help="Path to model to train (PVC filesystem path)", + help=( + "Path to model to train (PVC filesystem path). " + "Useful when calling training phases independently and users wants to point to the epoch directory. " + "Very advanced usage, not recommended for general use." + ), type=str, ) @click.option( @@ -507,9 +607,11 @@ def show( "--sdg-object-store-data-key", envvar="SDG_OBJECT_STORE_DATA_KEY", help=( - "Name of tarball that contains SDG data. (SDG_OBJECT_STORE_DATA_KEY env var)." - "The tarball MUST NOT contain a top-level directory. " - "To archive your SDG data, use the following command: cd /path/to/data && tar -czvf sdg.tar.gz *" + "Name of tarball that contains SDG data AND model files. (SDG_OBJECT_STORE_DATA_KEY env var)." + "The tarball MUST contain two directories: data and model." + "The data directory contains the SDG data." + "The model directory contains the model to train." + "To archive , use the following command: tar -czvf data.tar.gz /path/to/data /path/to/model ." ), type=str, ) @@ -533,16 +635,33 @@ def show( ), type=str, ) +@click.option( + "--force-pull", + help="Force pull the data (sdg data and model) from the object store even if it already exists in the PVC.", + is_flag=True, + default=False, +) +@click.option( + "--training-1-epoch-num", help="Number of epochs to train the model for.", default=7 +) +@click.option( + "--training-2-epoch-num", + help="Number of epochs to train the model for.", + default=10, +) @click.pass_context def run( ctx: click.Context, - namespace: typing.Optional[str] = "default", + namespace: typing.Optional[str] = None, taxonomy_repo_url: str = "", taxonomy_repo_branch: typing.Optional[str] = "", taxonomy_repo_pr: typing.Optional[str] = "", storage_class: typing.Optional[str] = None, serving_endpoint: typing.Optional[str] = None, serving_model: typing.Optional[str] = None, + judge_serving_endpoint: typing.Optional[str] = None, + judge_serving_model_name: typing.Optional[str] = None, + judge_serving_model_api_key: typing.Optional[str] = None, nproc_per_node: typing.Optional[int] = 1, eval_type: typing.Optional[str] = None, training_phase: typing.Optional[str] = None, @@ -555,6 +674,9 @@ def run( sdg_object_store_data_key: typing.Optional[str] = None, sdg_object_store_verify_tls: typing.Optional[bool] = None, sdg_object_store_secret: typing.Optional[str] = None, + force_pull: typing.Optional[bool] = False, + training_1_epoch_num: int = 7, + training_2_epoch_num: int = 10, ): """ Execute the distributed training on Kubernetes. @@ -567,6 +689,9 @@ def run( storage_class (str): The storage class to use for the PersistentVolumeClaim. For SDG only. serving_endpoint (str): The serving endpoint for SDG. For SDG only. serving_model (str): The serving model for SDG. For SDG only. + judge_serving_endpoint (str): The serving endpoint for evaluation. For Evaluation only. + judge_serving_model_name (str): The serving model name for evaluation. For Evaluation only. + judge_serving_model_api_key (str): The serving model API key for evaluation. For Evaluation only. nproc_per_node (int): The number of processes per node. For training only. eval_type (str): The type of evaluation to run. training_phase (str): The type of training phase to run. @@ -579,6 +704,9 @@ def run( sdg_object_store_data_key (str): The name of the tarball that contains SDG data. sdg_object_store_verify_tls (bool): Verify TLS for the object store. sdg_object_store_secret (str): The name of the Kubernetes Secret containing the SDG object store credentials. The namespace is inferred from the namespace option. + force_pull (bool): Force pull the data (sdg data and model) from the object store even if it already exists in the PVC. + training_1_epoch_num (int): Number of epochs to train the model for during phase 1. + training_2_epoch_num (int): Number of epochs to train the model for during phase 2. Returns: None @@ -591,6 +719,9 @@ def run( ctx.obj["storage_class"] = storage_class ctx.obj["serving_endpoint"] = serving_endpoint ctx.obj["serving_model"] = serving_model + ctx.obj["judge_serving_endpoint"] = judge_serving_endpoint + ctx.obj["judge_serving_model_name"] = judge_serving_model_name + ctx.obj["judge_serving_model_api_key"] = judge_serving_model_api_key ctx.obj["nproc_per_node"] = nproc_per_node ctx.obj["eval_type"] = eval_type ctx.obj["training_phase"] = training_phase @@ -603,6 +734,9 @@ def run( ctx.obj["sdg_object_store_data_key"] = sdg_object_store_data_key ctx.obj["sdg_object_store_verify_tls"] = sdg_object_store_verify_tls ctx.obj["sdg_object_store_secret"] = sdg_object_store_secret + ctx.obj["force_pull"] = force_pull + ctx.obj["training_1_epoch_num"] = training_1_epoch_num + ctx.obj["training_2_epoch_num"] = training_2_epoch_num ########################## # MAIN WORKFLOW SEQUENCE # @@ -632,11 +766,19 @@ def run( # ctx.obj["model_to_train"] = best_model.get("model") # Training Phase 2 - # ctx.invoke(train) + ctx.obj["training_phase"] = "2" + ctx.invoke(train) # Evaluation of phase 2 with MT-Bench - # ctx.obj["eval_type"] = "mt-bench" - # _ = ctx.invoke(evaluation) + ctx.obj["eval_type"] = "mt-bench" + scores = ctx.invoke(evaluation) + scores = json.loads(scores) + best_model = max(scores, key=lambda x: x["average_score"]) + logger.info("Best model: %s", best_model.get("model")) + ctx.obj["candidate_model"] = best_model.get("model") + + # Final evaluation + # TODO def get_security_context() -> kubernetes.client.V1SecurityContext: @@ -649,19 +791,27 @@ def get_security_context() -> kubernetes.client.V1SecurityContext: ) -def get_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol_mount() -> list[kubernetes.client.V1VolumeMount]: """ Get the volume mount for the SDG job. """ return [ kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH + name=DATA_VOLUME_NAME, mount_path=DATA_PVC_MOUNT_PATH ), - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + ] + + +def get_vol() -> list[kubernetes.client.V1Volume]: + """ + Get the volume for the SDG job. + """ + return [ + kubernetes.client.V1Volume( + name=DATA_VOLUME_NAME, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=DATA_PVC_NAME + ), ), ] @@ -696,6 +846,134 @@ def create_sdg_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ # Configureate Pod template container + exec_sdg_op_command = """ +from typing import * + +def sdg_op( + num_instructions_to_generate: int, + taxonomy: str, + sdg: str, + repo_branch: Optional[str], + repo_pr: Optional[int], +): + from os import getenv + + import openai + from instructlab.sdg import generate_data + from instructlab.sdg.utils.taxonomy import read_taxonomy + + api_key = getenv("api_key") + model = getenv("model") + endpoint = getenv("endpoint") + client = openai.OpenAI(base_url=endpoint, api_key=api_key) + + taxonomy_base = "main" if repo_branch or (repo_pr and int(repo_pr) > 0) else "empty" + + print("Generating syntetic dataset for:") + print() + print(read_taxonomy(taxonomy, taxonomy_base)) + + # generate_data has a magic word for its taxonomy_base argument - 'empty' + # it allows generating from the whole repo, see: + # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230 + generate_data( + client=client, + num_instructions_to_generate=num_instructions_to_generate, + output_dir=sdg, + taxonomy=taxonomy, + taxonomy_base=taxonomy_base, + model_name=model, + chunk_word_count=1000, + server_ctx_size=4096, + ) +""" + exec_sdg_op_args = """ +sdg_op(num_instructions_to_generate=2, repo_branch="", repo_pr="", taxonomy="/data/taxonomy", sdg="/data/generated") +""" + + exec_huggingface_importer_op_command = """ +from typing import * + +def huggingface_importer_op(model: str, repo_name: str): + from huggingface_hub import snapshot_download + + snapshot_download(repo_id=repo_name, cache_dir="/tmp", local_dir=model) +""" + exec_huggingface_importer_op_args = """ +huggingface_importer_op(repo_name="ibm-granite/granite-7b-base", model="/data/model") +""" + + exec_data_processing_op_command = """ +from typing import * + +def data_processing_op( + sdg: str, + processed_data: str, + model: str, + max_seq_len: Optional[int] = 4096, + max_batch_len: Optional[int] = 20000, +): + import os + + import instructlab.training.data_process as dp + from instructlab.training import ( + DataProcessArgs, + TrainingArgs, + ) + + # define training-specific arguments + training_args = TrainingArgs( + # define data-specific arguments + model_path=model, + data_path=f"{sdg}/*_train_msgs*.jsonl", + data_output_dir=processed_data, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + def data_processing(train_args: TrainingArgs) -> None: + # early validation logic here + if train_args.max_batch_len < train_args.max_seq_len: + raise ValueError( + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" + ) + + # process the training data + if not os.path.exists(train_args.data_output_dir): + os.makedirs(train_args.data_output_dir, exist_ok=True) + dp.main( + DataProcessArgs( + # XXX(osilkin): make a decision here, either: + # 1. the CLI is fully responsible for managing where the data is written + # 2. we never cache it and simply write it to a tmp file every time. + # + # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux + # where the user has a defined place for new temporary data to be written. + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + data_path=train_args.data_path, + max_seq_len=train_args.max_seq_len, + chat_tmpl_path=train_args.chat_tmpl_path, + ) + ) + + data_processing(train_args=training_args) +""" + exec_data_processing_op_args = """ +data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data") +""" + init_containers = [ kubernetes.client.V1Container( name="sdg-op-fetch-taxonomy-data", @@ -704,28 +982,21 @@ def create_sdg_job( args=[ 'git clone {exec_git_clone_op_repo_url} {TAXONOMY_PATH} && cd {TAXONOMY_PATH} && if [ -n "{exec_git_clone_op_repo_branch}" ]; then git fetch origin {exec_git_clone_op_repo_branch} && git checkout {exec_git_clone_op_repo_branch}; elif [ -n "{exec_git_clone_op_repo_pr}" ] && [ {exec_git_clone_op_repo_pr} -gt 0 ]; then git fetch origin pull/{exec_git_clone_op_repo_pr}/head:{exec_git_clone_op_repo_pr} && git checkout {exec_git_clone_op_repo_pr}; fi ' ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), kubernetes.client.V1Container( name="sdg-op-generate-synthetic-data", - image="quay.io/tcoufal/ilab-sdg:latest", - command=[ - "sh", - "-c", - '\nif ! [ -x "$(command -v pip)" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location \'kfp==2.9.0\' \'--no-deps\' \'typing-extensions>=3.7.4,<5; python_version<"3.9"\' && "$0" "$@"\n', - "sh", - "-ec", - 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sdg_op(\n num_instructions_to_generate: int,\n taxonomy: dsl.Input[dsl.Dataset],\n sdg: dsl.Output[dsl.Dataset],\n repo_branch: Optional[str],\n repo_pr: Optional[int],\n):\n from os import getenv\n\n import openai\n from instructlab.sdg import generate_data\n from instructlab.sdg.utils.taxonomy import read_taxonomy\n\n api_key = getenv("api_key")\n model = getenv("model")\n endpoint = getenv("endpoint")\n client = openai.OpenAI(base_url=endpoint, api_key=api_key)\n\n taxonomy_base = "main" if repo_branch or (repo_pr and int(repo_pr) > 0) else "empty"\n\n print("Generating syntetic dataset for:")\n print()\n print(read_taxonomy(taxonomy.path, taxonomy_base))\n\n # generate_data has a magic word for its taxonomy_base argument - `empty`\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n output_dir=sdg.path,\n taxonomy=taxonomy.path,\n taxonomy_base=taxonomy_base,\n model_name=model,\n chunk_word_count=1000,\n server_ctx_size=4096,\n )\n\n', - ], + # image="quay.io/tcoufal/ilab-sdg:latest", + image="registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989", + command=["/bin/sh", "-ce"], args=[ - "--executor_input", - '{"inputs": {"parameterValues": {"num_instructions_to_generate": 2, "repo_branch": "", "repo_pr": ""}, "artifacts": {"taxonomy": {"artifacts": [{"name": "taxonomy", "uri": "/input_data/taxonomy"}]}}}, "outputs": {"outputFile": "/tmp/kfp_outputs/output_metadata.json", "artifacts": {"sdg": {"artifacts": [{"name": "sdg", "uri": "/input_data/generated"}]}}}}', - "--function_to_execute", - "sdg_op", + PYTHON_EXECUTOR.format( + python_code=exec_sdg_op_command, + python_main=exec_sdg_op_args.strip(), + ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -739,22 +1010,14 @@ def create_sdg_job( kubernetes.client.V1Container( name="huggingface-importer-op", image="registry.access.redhat.com/ubi9/python-311:latest", - command=[ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && python3 -m pip install --quiet --no-warn-script-location 'huggingface_hub' && \"$0\" \"$@\"\n", - "sh", - "-ec", - 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef huggingface_importer_op(model: dsl.Output[dsl.Model], repo_name: str):\n from huggingface_hub import snapshot_download\n\n snapshot_download(repo_id=repo_name, cache_dir="/tmp", local_dir=model.path)\n\n', - ], + command=["/bin/sh", "-ce"], args=[ - "--executor_input", - '{"inputs": {"parameterValues": {"repo_name": "ibm-granite/granite-7b-base"}}, "outputs": {"outputFile": "/tmp/kfp_outputs/output_metadata.json", "artifacts": {"model": {"artifacts": [{"name": "model", "uri": "/input_model"}]}}}}', - "--function_to_execute", - "huggingface_importer_op", + PYTHON_EXECUTOR.format( + python_code=exec_huggingface_importer_op_command, + python_main=exec_huggingface_importer_op_args.strip(), + ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -768,22 +1031,14 @@ def create_sdg_job( kubernetes.client.V1Container( name="sdg-preprocess", image="registry.access.redhat.com/ubi9/python-311:latest", - command=[ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && python3 -m pip install --quiet --no-warn-script-location 'instructlab-training@git+https://github.com/instructlab/training.git' && \"$0\" \"$@\"\n", - "sh", - "-ec", - 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data: dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len: Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n import os\n\n import instructlab.training.data_process as dp\n from instructlab.training import (\n DataProcessArgs,\n TrainingArgs,\n )\n\n # define training-specific arguments\n training_args = TrainingArgs(\n # define data-specific arguments\n model_path=model.path,\n data_path=f"{sdg.path}/*_train_msgs*.jsonl",\n data_output_dir=processed_data.path,\n # define model-trianing parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n # XXX(shanand): We don\'t need the following arguments\n # for data processing. Added them for now to avoid\n # Pydantic validation errors for TrainingArgs\n ckpt_output_dir="data/saved_checkpoints",\n num_epochs=2,\n effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n warmup_steps=800,\n is_padding_free=True,\n )\n\n def data_processing(train_args: TrainingArgs) -> None:\n # early validation logic here\n if train_args.max_batch_len < train_args.max_seq_len:\n raise ValueError(\n f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}"\n )\n\n # process the training data\n if not os.path.exists(train_args.data_output_dir):\n os.makedirs(train_args.data_output_dir, exist_ok=True)\n dp.main(\n DataProcessArgs(\n # XXX(osilkin): make a decision here, either:\n # 1. the CLI is fully responsible for managing where the data is written\n # 2. we never cache it and simply write it to a tmp file every time.\n #\n # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux\n # where the user has a defined place for new temporary data to be written.\n data_output_path=train_args.data_output_dir,\n model_path=train_args.model_path,\n data_path=train_args.data_path,\n max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n )\n )\n\n data_processing(train_args=training_args)\n\n', - ], + command=["/bin/sh", "-ce"], args=[ - "--executor_input", - '{"inputs": {"parameterValues": {"max_seq_len": 4096, "max_batch_len": 20000}, "artifacts": {"sdg": {"artifacts": [{"name": "sdg", "uri": "/input_data/generated"}]}, "model": {"artifacts": [{"name": "model", "uri": "/input_model"}]}}}, "outputs": {"outputFile": "/tmp/kfp_outputs/output_metadata.json", "artifacts": {"processed_data": {"artifacts": [{"name": "processed_data", "uri": "/input_data/processed_data"}]}}}}', - "--function_to_execute", - "data_processing_op", + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), + ), ], - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), ] @@ -805,30 +1060,13 @@ def create_sdg_job( name="copy-model-to-pvc", image=TOOLBOX_IMAGE, command=["/bin/sh", "-c"], - args=[f"cp -r -v {MODEL_PVC_MOUNT_PATH} {TRAINING_PVC_MOUNT_PATH}"], - volume_mounts=get_sdg_vol_mount(), + args=[ + f"cp -r -v {DATA_PVC_MOUNT_PATH} {DATA_PVC_MOUNT_PATH}" + ], # TODO: fix me, dumb line to pass linter, this feat is unused anyway + volume_mounts=get_vol_mount(), ) - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] + volumes = get_vol() # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( @@ -861,6 +1099,7 @@ def create_sdg_data_fetch_job( namespace: str, job_name: str, sdg_object_store_secret: str, + force_pull: bool = False, ) -> kubernetes.client.V1Job: """ Create a Kubernetes Job object. @@ -876,104 +1115,188 @@ def create_sdg_data_fetch_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ - container = kubernetes.client.V1Container( - name="fetch-sdg-files-from-object-store", - image=PYTHON_IMAGE, - command=["/bin/sh", "-c"], - args=[ - SDG_DATA_SCRIPT.format( - strategy="download", SDG_PVC_MOUNT_PATH=SDG_PVC_MOUNT_PATH + exec_data_processing_op_command = """ +from typing import * + +def data_processing_op( + sdg: str, + processed_data: str, + model: str, + max_seq_len: Optional[int] = 4096, + max_batch_len: Optional[int] = 20000, +): + import os + + import instructlab.training.data_process as dp + from instructlab.training import ( + DataProcessArgs, + TrainingArgs, + ) + + # define training-specific arguments + training_args = TrainingArgs( + # define data-specific arguments + model_path=model, + data_path=f"{sdg}/*_train_msgs*.jsonl", + data_output_dir=processed_data, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + def data_processing(train_args: TrainingArgs) -> None: + # early validation logic here + if train_args.max_batch_len < train_args.max_seq_len: + raise ValueError( + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" ) - ], - volume_mounts=get_sdg_vol_mount(), - env=[ - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ENDPOINT", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="endpoint", optional=True - ) + + # process the training data + if not os.path.exists(train_args.data_output_dir): + os.makedirs(train_args.data_output_dir, exist_ok=True) + dp.main( + DataProcessArgs( + # XXX(osilkin): make a decision here, either: + # 1. the CLI is fully responsible for managing where the data is written + # 2. we never cache it and simply write it to a tmp file every time. + # + # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux + # where the user has a defined place for new temporary data to be written. + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + data_path=train_args.data_path, + max_seq_len=train_args.max_seq_len, + chat_tmpl_path=train_args.chat_tmpl_path, + ) + ) + + data_processing(train_args=training_args) +""" + exec_data_processing_op_args = """ +data_processing_op(max_seq_len=4096, max_batch_len=20000, sdg="/data/data", model="/data/model", processed_data="/data/processed_data") +""" + + init_containers = [ + kubernetes.client.V1Container( + name="fetch-sdg-files-from-object-store", + image=DS_IMAGE, + command=["/bin/sh", "-c"], + args=[ + DATA_SCRIPT.format( + strategy="download", + force_pull=force_pull, + data_pvc_mount_path=DATA_PVC_MOUNT_PATH, + ) + ], + volume_mounts=get_vol_mount(), + env=[ + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ENDPOINT", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="endpoint", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_BUCKET", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="bucket", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_BUCKET", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="bucket", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ACCESS_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="access_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ACCESS_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="access_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_SECRET_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="secret_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_SECRET_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="secret_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_REGION", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="region", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_REGION", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="region", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_DATA_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="data_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_DATA_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="data_key", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_VERIFY_TLS", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="verify_tls", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_VERIFY_TLS", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="verify_tls", + optional=True, + ) + ), ), - ), - ], - ) + ], + ) + ] - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME + container = kubernetes.client.V1Container( + name="sdg-preprocess", + # image="quay.io/tcoufal/ilab-sdg:latest", + image=RHELAI_IMAGE, + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME + ], + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=K8S_NAME) ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource(name=K8S_NAME) ), - ), - ] + ], + ) # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "sdg-data-fetch"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", + init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1036,64 +1359,241 @@ def create_eval_job( # ), # ], # ) + + exec_run_mt_bench_op_command = """ +from typing import * + +def run_mt_bench_op( + models_path_prefix: str, + mt_bench_output: str, + merge_system_user_message: bool, + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment + # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 + max_workers: str, + models_list: List[str] = None, + models_folder: Optional[str] = None, + device: str = None, +) -> NamedTuple("outputs", best_model=str, best_score=float): + import json + import os + + import torch + from instructlab.eval.mt_bench import MTBenchEvaluator + + VLLM_SERVER = "http://localhost:8000/v1" + + def launch_vllm( + model_path: str, gpu_count: int, retries: int = 120, delay: int = 5 + ): + import subprocess + import sys + import time + + import requests + + if gpu_count > 0: + command = [ + sys.executable, + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, + "--tensor-parallel-size", + str(gpu_count), + ] + else: + command = [ + sys.executable, + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, + ] + + subprocess.Popen(args=command) + + print(f"Waiting for vLLM server to start at {VLLM_SERVER}...") + + for attempt in range(retries): + try: + response = requests.get(f"{VLLM_SERVER}/models") + if response.status_code == 200: + print(f"vLLM server is up and running at {VLLM_SERVER}.") + return + except requests.ConnectionError: + pass + + print( + f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})..." + ) + time.sleep(delay) + + raise RuntimeError( + f"Failed to start vLLM server at {VLLM_SERVER} after {retries} retries." + ) + + # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn't work + # Also, the base image does not include 'pkill' cmd, so can't pkill -f vllm.entrypoints.openai.api_server either + def stop_vllm(): + import psutil + + for process in psutil.process_iter(attrs=["pid", "name", "cmdline"]): + cmdline = process.info.get("cmdline") + if cmdline and "vllm.entrypoints.openai.api_server" in cmdline: + print( + f"Found vLLM server process with PID: {process.info['pid']}, terminating..." + ) + try: + process.terminate() # Try graceful termination + process.wait(timeout=5) # Wait a bit for it to terminate + if process.is_running(): + print( + f"Forcefully killing vLLM server process with PID: {process.info['pid']}" + ) + process.kill() # Force kill if it's still running + print( + f"Successfully stopped vLLM server with PID: {process.info['pid']}" + ) + except psutil.NoSuchProcess: + print(f"Process with PID {process.info['pid']} no longer exists.") + except psutil.AccessDenied: + print( + f"Access denied when trying to terminate process with PID {process.info['pid']}." + ) + except Exception as e: + print( + f"Failed to terminate process with PID {process.info['pid']}. Error: {e}" + ) + + os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" + + gpu_available = torch.cuda.is_available() + gpu_name = ( + torch.cuda.get_device_name(torch.cuda.current_device()) + if gpu_available + else "No GPU available" + ) + gpu_count = torch.cuda.device_count() if gpu_available else 0 + + print(f"GPU Available: {gpu_available}, {gpu_name}") + + if models_list is None and models_folder: + models_list = os.listdir(models_folder) + + judge_api_key = os.getenv("JUDGE_API_KEY", "") + judge_model_name = os.getenv("JUDGE_NAME") + judge_endpoint = os.getenv("JUDGE_ENDPOINT") + + scores = {} + all_mt_bench_data = [] + + # generate_answers,judgment uses a magic word for its mt_bench evaluator - 'auto' + # with 'auto', number of gpus allocated for serving is calculated based on environment + # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 + if max_workers == "auto": + try: + usable_cpu_count = len(os.sched_getaffinity(0)) // 2 + except AttributeError: + usable_cpu_count = multiprocessing.cpu_count() // 2 + max_workers = usable_cpu_count + + for model_name in models_list: + print(f"Serving candidate model: {model_name}") + model_path = f"{models_path_prefix}/{model_name}" + + launch_vllm(model_path, gpu_count) + + # model ID is the model_path value in vLLM + evaluator = MTBenchEvaluator( + model_name=model_path, + judge_model_name=judge_model_name, + output_dir="/tmp/eval_output", + merge_system_user_message=merge_system_user_message, + ) + + evaluator.gen_answers( + server_url=VLLM_SERVER, + serving_gpus=gpu_count, + max_workers=max_workers, + ) + + stop_vllm() + + overall_score, qa_pairs, turn_scores, error_rate = evaluator.judge_answers( + server_url=judge_endpoint, + api_key=judge_api_key, + serving_gpus=gpu_count, + max_workers=max_workers, + ) + + mt_bench_data = { + "report_title": "SKILLS EVALUATION REPORT", + "model": model_path, + "judge_model": judge_model_name, + "overall_score": overall_score, + "turn_scores": turn_scores, + "qa_scores": qa_pairs, + "error_rate": error_rate, + } + + all_mt_bench_data.append(mt_bench_data) + scores[model_path] = overall_score + + with open(mt_bench_output, "w") as f: + json.dump(all_mt_bench_data, f, indent=4) + + outputs = NamedTuple("outputs", best_model=str, best_score=float) + best_model = max(scores, key=scores.get) + best_score = scores[best_model] + return outputs(best_model=best_model, best_score=best_score) +""" + exec_run_mt_bench_op_args = """ +run_mt_bench_op(mt_bench_output="/data/mt-bench-results.txt", models_folder="/data/model/output/hf_format", models_path_prefix="/data/model/output/hf_format", max_workers="auto", merge_system_user_message=False) +""" + if eval_type == "mt-bench": init_containers = [ kubernetes.client.V1Container( name=f"run-eval-{eval_type}", - image="quay.io/sallyom/instructlab-ocp:eval-10-8", - command=[ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && python3 -m pip install --quiet --no-warn-script-location 'vllm' && \"$0\" \"$@\"\n", - "sh", - "-ec", - 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef run_mt_bench_op(\n models_path_prefix: str,\n mt_bench_output: Output[Artifact],\n merge_system_user_message: bool,\n # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`, number of gpus allocated for serving is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n max_workers: str,\n models_list: List[str] = None,\n models_folder: Optional[str] = None,\n device: str = None,\n) -> NamedTuple("outputs", best_model=str, best_score=float):\n import json\n import os\n\n import torch\n from helpers import (\n VLLM_SERVER,\n launch_vllm,\n stop_vllm,\n )\n from instructlab.eval.mt_bench import MTBenchEvaluator\n\n os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"\n\n gpu_available = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n if gpu_available\n else "No GPU available"\n )\n gpu_count = torch.cuda.device_count() if gpu_available else 0\n\n print(f"GPU Available: {gpu_available}, {gpu_name}")\n\n if models_list is None and models_folder:\n models_list = os.listdir(models_folder)\n\n judge_api_key = os.getenv("JUDGE_API_KEY", "")\n judge_model_name = os.getenv("JUDGE_NAME")\n judge_endpoint = os.getenv("JUDGE_ENDPOINT")\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`, number of gpus allocated for serving is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n if max_workers == "auto":\n try:\n usable_cpu_count = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n usable_cpu_count = multiprocessing.cpu_count() // 2\n max_workers = usable_cpu_count\n\n for model_name in models_list:\n print(f"Serving candidate model: {model_name}")\n model_path = f"{models_path_prefix}/{model_name}"\n\n launch_vllm(model_path, gpu_count)\n\n # model ID is the model_path value in vLLM\n evaluator = MTBenchEvaluator(\n model_name=model_path,\n judge_model_name=judge_model_name,\n output_dir="/tmp/eval_output",\n merge_system_user_message=merge_system_user_message,\n )\n\n evaluator.gen_answers(\n server_url=VLLM_SERVER,\n serving_gpus=gpu_count,\n max_workers=max_workers,\n )\n\n stop_vllm()\n\n overall_score, qa_pairs, turn_scores, error_rate = evaluator.judge_answers(\n server_url=judge_endpoint,\n api_key=judge_api_key,\n serving_gpus=gpu_count,\n max_workers=max_workers,\n )\n\n mt_bench_data = {\n "report_title": "SKILLS EVALUATION REPORT",\n "model": model_path,\n "judge_model": judge_model_name,\n "overall_score": overall_score,\n "turn_scores": turn_scores,\n "qa_scores": qa_pairs,\n "error_rate": error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n scores[model_path] = overall_score\n\n with open(mt_bench_output.path, "w") as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs = NamedTuple("outputs", best_model=str, best_score=float)\n best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n return outputs(best_model=best_model, best_score=best_score)\n\n', - ], + image=RHELAI_IMAGE, + command=["/bin/sh", "-ce"], args=[ - "--executor_input", - '{"inputs": {"parameterValues": {"models_path_prefix": "/output/model/hf_format", "merge_system_user_message": false, "max_workers": "auto"}}, "outputs": {"outputFile": "/tmp/kfp_outputs/output_metadata.json", "artifacts": {"mt_bench_output": {"artifacts": [{"name": "mt_bench_output", "uri": "/output/mt-bench-results.txt"}]}}}}', - "--function_to_execute", - "run_mt_bench_op", + PYTHON_EXECUTOR.format( + python_code=exec_run_mt_bench_op_command, + python_main=exec_run_mt_bench_op_args.strip(), + ), ], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource( + name=JUDGE_SERVING_NAME + ) ), ], ) ] container = kubernetes.client.V1Container( name=f"output-eval-{eval_type}-scores", - image="quay.io/sallyom/instructlab-ocp:eval-10-8", + image=RHELAI_IMAGE, command=["/bin/sh", "-c"], args=[f"cat {MT_BENCH_SCORES_PATH}"], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + security_context=get_security_context(), + volume_mounts=get_vol_mount(), ) else: raise ValueError(f"Unknown evaluation type: {eval_type}") - volumes = [ - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] - # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( - metadata=kubernetes.client.V1ObjectMeta(labels={"app": "eval"}), + metadata=kubernetes.client.V1ObjectMeta(labels={"app": f"eval-{eval_type}"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1149,6 +1649,7 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: # Wait for the job to complete w = kubernetes.watch.Watch() + pod_log = None for event in w.stream(batch_v1.list_namespaced_job, namespace=namespace): job_event = event["object"] if job_event.metadata.name != job.metadata.name: @@ -1162,6 +1663,8 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: job.spec.template.metadata.labels["app"] ), ) + # On success return the logs of the last pod which contains the output + # (useful to get eval scores) pod_log = core_v1.read_namespaced_pod_log( name=pods.items[0].metadata.name, namespace=namespace ) @@ -1274,25 +1777,11 @@ def sdg( # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", }, ] for pvc in pvcs: @@ -1300,7 +1789,7 @@ def sdg( v1.create_namespaced_persistent_volume_claim( namespace=namespace, body=create_pvc(**pvc) ) - logger.info("Successfully creayed PVC '%s' created.", pvc.get("name")) + logger.info("Successfully created PVC '%s' created.", pvc.get("name")) except kubernetes.client.rest.ApiException as exc: if exc.status == 409: logger.info("PVC '%s' already exists.", pvc["name"]) @@ -1374,6 +1863,9 @@ def sdg_data_fetch( # Populate variables from context namespace = ctx.obj["namespace"] storage_class = ctx.obj["storage_class"] + judge_serving_endpoint = ctx.obj["judge_serving_endpoint"] + judge_serving_model_name = ctx.obj["judge_serving_model_name"] + judge_serving_model_api_key = ctx.obj["judge_serving_model_api_key"] sdg_object_store_endpoint = ctx.obj["sdg_object_store_endpoint"] sdg_object_store_bucket = ctx.obj["sdg_object_store_bucket"] sdg_object_store_access_key = ctx.obj["sdg_object_store_access_key"] @@ -1382,6 +1874,10 @@ def sdg_data_fetch( sdg_object_store_data_key = ctx.obj["sdg_object_store_data_key"] sdg_object_store_verify_tls = ctx.obj["sdg_object_store_verify_tls"] sdg_object_store_secret = ctx.obj["sdg_object_store_secret"] + force_pull = ctx.obj["force_pull"] + + # Make sure the endpoint is a valid URL + validate_url(judge_serving_endpoint) # Check if all required arguments are provided if not sdg_object_store_secret: @@ -1478,28 +1974,35 @@ def decode_base64(data): "'bucket', 'access_key', 'secret_key', 'data_key'.", ) + # Create Secret config details for evaluation + judge_serving_details_secret = JUDGE_SERVING_NAME + secret = kubernetes.client.V1Secret( + metadata=kubernetes.client.V1ObjectMeta( + name=judge_serving_details_secret, namespace=namespace + ), + string_data={ + "JUDGE_NAME": judge_serving_model_name, + "JUDGE_API_KEY": judge_serving_model_api_key, + "JUDGE_ENDPOINT": judge_serving_endpoint, + }, + ) + + try: + v1.create_namespaced_secret(namespace=namespace, body=secret) + except kubernetes.client.rest.ApiException as exc: + if exc.status == 409: + logger.info("Secret '%s' already exists.", secret.metadata.name) + else: + raise + # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", # Allocate size for a few models and large SDG data sets }, ] for pvc in pvcs: @@ -1519,6 +2022,7 @@ def decode_base64(data): namespace=namespace, job_name="sdg-data-fetch", sdg_object_store_secret=sdg_object_store_secret, + force_pull=force_pull, ) # Run the job @@ -1538,27 +2042,35 @@ def train( training_phase = ctx.obj["training_phase"] path_to_model = ctx.obj["model_to_train"] nproc_per_node: int = ctx.obj["nproc_per_node"] + training_1_epoch_num: int = ctx.obj["training_1_epoch_num"] + training_2_epoch_num: int = ctx.obj["training_2_epoch_num"] if training_phase is None: raise ValueError("Training phase must be provided with --training-phase=[1|2]") # During the initial training if path_to_model is None: - path_to_model = "/input_model" + path_to_model = DATA_PVC_MODEL_PATH + + epoch_num = None + if training_phase == "1": + epoch_num = training_1_epoch_num + elif training_phase == "2": + epoch_num = training_2_epoch_num logger.info("Running multi-phased distributed training phase %s", training_phase) worker_replicas = PYTORCH_NNODES - 1 pytorch_training_job_yaml = yaml.safe_load( PYTORCH_TRAINING_JOB.format( - name="train-sdg", - model_pvc_name="model", - input_pvc_name="sdg-data", - output_pvc_name="training-data", + name=f"train-phase-{training_phase}", + data_pvc_name=DATA_PVC_NAME, path_to_model=path_to_model, nproc_per_node=nproc_per_node, - PYTORCH_NNODES=PYTORCH_NNODES, - PYTORCH_IMAGE=PYTORCH_IMAGE, + nnodes=PYTORCH_NNODES, + image=RHELAI_IMAGE, worker_replicas=worker_replicas, + epoch_num=epoch_num, + phase_num=training_phase, ) ) diff --git a/standalone/standalone.tpl b/standalone/standalone.tpl index 6982e7d7..4fe917fe 100755 --- a/standalone/standalone.tpl +++ b/standalone/standalone.tpl @@ -25,6 +25,7 @@ import base64 import json import logging import typing +from os import path from urllib.parse import urlparse import click @@ -45,27 +46,26 @@ logger = logging.getLogger(__name__) DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git" K8S_NAME = "kfp-model-server" TOOLBOX_IMAGE = "registry.access.redhat.com/ubi9/toolbox" -PYTHON_IMAGE = "registry.access.redhat.com/ubi9/python-311:latest" -SDG_PVC_NAME = "sdg-data" -SDG_PVC_MOUNT_PATH = "/input_data" -SDG_VOLUME_NAME = "input-data" -MODEL_PVC_NAME = "model" -MODEL_PVC_MOUNT_PATH = "/input_model" -MODEL_VOLUME_NAME = "model" -TAXONOMY_PATH = SDG_PVC_MOUNT_PATH + "/taxonomy" -TRAINING_PVC_NAME = "training-data" -TRAINING_PVC_MOUNT_PATH = "/output" -TRAINING_VOLUME_NAME = "output" +DS_IMAGE = "quay.io/opendatahub/workbench-images:jupyter-datascience-ubi9-python-3.11-20241004-609ffb8" +RHELAI_IMAGE = "registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.2" +DATA_PVC_NAME = "data" +DATA_PVC_MOUNT_PATH = "/data" +DATA_PVC_MODEL_PATH = path.join(DATA_PVC_MOUNT_PATH, "model") +DATA_VOLUME_NAME = "data" +TAXONOMY_PATH = path.join(DATA_PVC_MOUNT_PATH, "taxonomy") +DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") +DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PYTORCH_NNODES = 2 -PYTORCH_IMAGE = "quay.io/shanand/test-train:0.0.4" # MMLU_SCORES_PATH = "/output/mmlu-results.txt" -MT_BENCH_SCORES_PATH = "/output/mt-bench-results.txt" +MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") SDG_OBJECT_STORE_SECRET_NAME = "sdg-object-store-credentials" KFP_MODEL_SERVER_CM = """ # TODO: remove the following line and replace it with the actual ConfigMap/Secret {{kfp_model_server_cm}} """ +JUDGE_SERVING_NAME = "judge-serving-details" + PYTORCH_TRAINING_JOB = """ apiVersion: kubeflow.org/v1 kind: PyTorchJob @@ -85,27 +85,50 @@ spec: containers: - args: - | - mkdir -p /output/model; - mkdir -p /output/data; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /output/model --data_output_dir /input_data/processed_data + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" + mkdir -p /data/model; + mkdir -p /data/data; + mkdir -p {path_to_model}/output + export XDG_CACHE_HOME=/tmp + export TRITON_CACHE_DIR=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir={path_to_model}/output \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=1e-4 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' - '--' - image: {PYTORCH_IMAGE} + image: {image} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -116,15 +139,9 @@ spec: cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output + - name: data persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} Worker: replicas: {worker_replicas} restartPolicy: OnFailure @@ -136,27 +153,48 @@ spec: containers: - args: - | + phase_num={phase_num} + echo "Running phase $phase_num" + PATH_TO_MODEL={path_to_model} + if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/hf_format/$(ls --sort=time {path_to_model}/output/hf_format|head -n 1)"; fi + echo "Using $PATH_TO_MODEL model for training" mkdir -p /tmp/model; - python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir /tmp/model --data_output_dir /input_data/processed_data + export TRITON_CACHE_DIR=/tmp + export XDG_CACHE_HOME=/tmp + export HF_HOME=/tmp + export TRANSFORMERS_CACHE=/tmp + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank $(RANK) \ + --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path="$PATH_TO_MODEL" \ + --data_path=/data/processed_data/data.jsonl \ + --output_dir=/tmp/model \ + --num_epochs={epoch_num} \ + --effective_batch_size=3840 \ + --learning_rate=2e-6 \ + --num_warmup_steps=800 \ + --save_samples=0 \ + --log_level=INFO \ + --max_batch_len=20000 \ + --seed=42 \ + --cpu_offload_optimizer \ + --sharding_strategy=FULL_SHARD \ + --is_granite \ + --checkpoint_at_epoch command: - /bin/bash - '-c' - '--' - image: {PYTORCH_IMAGE} + image: {image} name: pytorch volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true + - mountPath: /data + name: data env: - name: NNODES - value: \"{PYTORCH_NNODES}\" + value: \"{nnodes}\" - name: NPROC_PER_NODE value: \"{nproc_per_node}\" resources: @@ -167,20 +205,26 @@ spec: cpu: 2 "nvidia.com/gpu": {nproc_per_node} volumes: - - name: input-data + - name: data persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} + claimName: {data_pvc_name} """ # TODO: support signature version? -SDG_DATA_SCRIPT = """ +DATA_SCRIPT = """ set -e +FORCE_PULL={force_pull} +if [ -s {data_pvc_mount_path}/data.tar.gz ] && [ -d {data_pvc_mount_path}/data ] && [ -d {data_pvc_mount_path}/model ] ; then + echo "Data tarball and sdg/model directories already exist in the PVC. Skipping download." + if [ "$FORCE_PULL" == "None" ] || [ "$FORCE_PULL" == "False" ]; then + echo "'--force-pull' is not set - will not force pull the data from the object store" + ls -laR {data_pvc_mount_path} + exit 0 + else + echo "'--force-pull' is set to true - will force pull the data from the object store" + fi +fi + export STRATEGY={strategy} if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then @@ -223,7 +267,7 @@ def download_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') - output_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' + output_file = '{data_pvc_mount_path}/data.tar.gz' s3.download_file(bucket_name, s3_key, output_file) @@ -232,7 +276,7 @@ def upload_s3_file(): bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name - input_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' # TODO: change for model path + input_file = '{data_pvc_mount_path}/data.tar.gz' # TODO: change for model path s3.upload_file(input_file, bucket_name, s3_key) @@ -249,9 +293,22 @@ EOF python "$tmp"/download_s3.py -if [[ "$STRATEGY" == "download" ]]; then - mkdir -p {SDG_PVC_MOUNT_PATH}/generated - tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated +if [ "$STRATEGY" == "download" ]; then + # List top-level directories only (no nested directories) + top_level_dirs=$(tar --exclude='*/*' --list --file {data_pvc_mount_path}/data.tar.gz) + + # Loop through the expected directories and check if they exist in the archive + for dir in data model; do + if ! echo "$top_level_dirs" | grep -q "^$dir/$"; then + echo "Archive does not contain a '$dir' directory" + exit 1 + fi + done + echo "All expected directories are present." + + echo "Extracting data from the archive" + tar -C {data_pvc_mount_path} -xvf {data_pvc_mount_path}/data.tar.gz + ls -laR {data_pvc_mount_path} fi """ @@ -288,6 +345,23 @@ spec: name: {script_configmap} """ +PYTHON_EXECUTOR = """ +set -e +export XDG_CACHE_HOME=/tmp + +tmp=$(mktemp -d) +cat < "$tmp"/exec.py + +{python_code} + +if __name__ == "__main__": + {python_main} + +EOF + +python3 "$tmp"/exec.py +""" + @click.group() def cli(): @@ -392,9 +466,7 @@ def show( @cli.group(invoke_without_command=True) -@click.option( - "--namespace", type=str, default="default", help="Kubernetes namespace to use" -) +@click.option("--namespace", type=str, help="Kubernetes namespace to use") @click.option( "--taxonomy-repo-url", type=str, @@ -431,6 +503,30 @@ def show( help="Serving model for SDG - for SDG only", hidden=True, ) +@click.option( + "--judge-serving-endpoint", + type=str, + help=( + "Serving endpoint for evaluation." + "e.g. http://serving.kubeflow.svc.cluster.local:8080/v1" + ), + required=True, +) +@click.option( + "--judge-serving-model-name", + type=str, + help="The name of the model to use for evaluation.", + required=True, +) +@click.option( + "--judge-serving-model-api-key", + type=str, + help=( + "Serving model API key for evaluation. " "(JUDGE_SERVING_MODEL_API_KEY env var)" + ), + envvar="JUDGE_SERVING_MODEL_API_KEY", + required=True, +) @click.option( "--nproc-per-node", type=int, @@ -450,7 +546,11 @@ def show( ) @click.option( "--model-to-train", - help="Path to model to train (PVC filesystem path)", + help=( + "Path to model to train (PVC filesystem path). " + "Useful when calling training phases independently and users wants to point to the epoch directory. " + "Very advanced usage, not recommended for general use." + ), type=str, ) @click.option( @@ -492,9 +592,11 @@ def show( "--sdg-object-store-data-key", envvar="SDG_OBJECT_STORE_DATA_KEY", help=( - "Name of tarball that contains SDG data. (SDG_OBJECT_STORE_DATA_KEY env var)." - "The tarball MUST NOT contain a top-level directory. " - "To archive your SDG data, use the following command: cd /path/to/data && tar -czvf sdg.tar.gz *" + "Name of tarball that contains SDG data AND model files. (SDG_OBJECT_STORE_DATA_KEY env var)." + "The tarball MUST contain two directories: data and model." + "The data directory contains the SDG data." + "The model directory contains the model to train." + "To archive , use the following command: tar -czvf data.tar.gz /path/to/data /path/to/model ." ), type=str, ) @@ -518,16 +620,33 @@ def show( ), type=str, ) +@click.option( + "--force-pull", + help="Force pull the data (sdg data and model) from the object store even if it already exists in the PVC.", + is_flag=True, + default=False, +) +@click.option( + "--training-1-epoch-num", help="Number of epochs to train the model for.", default=7 +) +@click.option( + "--training-2-epoch-num", + help="Number of epochs to train the model for.", + default=10, +) @click.pass_context def run( ctx: click.Context, - namespace: typing.Optional[str] = "default", + namespace: typing.Optional[str] = None, taxonomy_repo_url: str = "", taxonomy_repo_branch: typing.Optional[str] = "", taxonomy_repo_pr: typing.Optional[str] = "", storage_class: typing.Optional[str] = None, serving_endpoint: typing.Optional[str] = None, serving_model: typing.Optional[str] = None, + judge_serving_endpoint: typing.Optional[str] = None, + judge_serving_model_name: typing.Optional[str] = None, + judge_serving_model_api_key: typing.Optional[str] = None, nproc_per_node: typing.Optional[int] = 1, eval_type: typing.Optional[str] = None, training_phase: typing.Optional[str] = None, @@ -540,6 +659,9 @@ def run( sdg_object_store_data_key: typing.Optional[str] = None, sdg_object_store_verify_tls: typing.Optional[bool] = None, sdg_object_store_secret: typing.Optional[str] = None, + force_pull: typing.Optional[bool] = False, + training_1_epoch_num: int = 7, + training_2_epoch_num: int = 10, ): """ Execute the distributed training on Kubernetes. @@ -552,6 +674,9 @@ def run( storage_class (str): The storage class to use for the PersistentVolumeClaim. For SDG only. serving_endpoint (str): The serving endpoint for SDG. For SDG only. serving_model (str): The serving model for SDG. For SDG only. + judge_serving_endpoint (str): The serving endpoint for evaluation. For Evaluation only. + judge_serving_model_name (str): The serving model name for evaluation. For Evaluation only. + judge_serving_model_api_key (str): The serving model API key for evaluation. For Evaluation only. nproc_per_node (int): The number of processes per node. For training only. eval_type (str): The type of evaluation to run. training_phase (str): The type of training phase to run. @@ -564,6 +689,9 @@ def run( sdg_object_store_data_key (str): The name of the tarball that contains SDG data. sdg_object_store_verify_tls (bool): Verify TLS for the object store. sdg_object_store_secret (str): The name of the Kubernetes Secret containing the SDG object store credentials. The namespace is inferred from the namespace option. + force_pull (bool): Force pull the data (sdg data and model) from the object store even if it already exists in the PVC. + training_1_epoch_num (int): Number of epochs to train the model for during phase 1. + training_2_epoch_num (int): Number of epochs to train the model for during phase 2. Returns: None @@ -576,6 +704,9 @@ def run( ctx.obj["storage_class"] = storage_class ctx.obj["serving_endpoint"] = serving_endpoint ctx.obj["serving_model"] = serving_model + ctx.obj["judge_serving_endpoint"] = judge_serving_endpoint + ctx.obj["judge_serving_model_name"] = judge_serving_model_name + ctx.obj["judge_serving_model_api_key"] = judge_serving_model_api_key ctx.obj["nproc_per_node"] = nproc_per_node ctx.obj["eval_type"] = eval_type ctx.obj["training_phase"] = training_phase @@ -588,6 +719,9 @@ def run( ctx.obj["sdg_object_store_data_key"] = sdg_object_store_data_key ctx.obj["sdg_object_store_verify_tls"] = sdg_object_store_verify_tls ctx.obj["sdg_object_store_secret"] = sdg_object_store_secret + ctx.obj["force_pull"] = force_pull + ctx.obj["training_1_epoch_num"] = training_1_epoch_num + ctx.obj["training_2_epoch_num"] = training_2_epoch_num ########################## # MAIN WORKFLOW SEQUENCE # @@ -617,11 +751,19 @@ def run( # ctx.obj["model_to_train"] = best_model.get("model") # Training Phase 2 - # ctx.invoke(train) + ctx.obj["training_phase"] = "2" + ctx.invoke(train) # Evaluation of phase 2 with MT-Bench - # ctx.obj["eval_type"] = "mt-bench" - # _ = ctx.invoke(evaluation) + ctx.obj["eval_type"] = "mt-bench" + scores = ctx.invoke(evaluation) + scores = json.loads(scores) + best_model = max(scores, key=lambda x: x["average_score"]) + logger.info("Best model: %s", best_model.get("model")) + ctx.obj["candidate_model"] = best_model.get("model") + + # Final evaluation + # TODO def get_security_context() -> kubernetes.client.V1SecurityContext: @@ -634,19 +776,27 @@ def get_security_context() -> kubernetes.client.V1SecurityContext: ) -def get_sdg_vol_mount() -> kubernetes.client.V1VolumeMount: +def get_vol_mount() -> list[kubernetes.client.V1VolumeMount]: """ Get the volume mount for the SDG job. """ return [ kubernetes.client.V1VolumeMount( - name=SDG_VOLUME_NAME, mount_path=SDG_PVC_MOUNT_PATH + name=DATA_VOLUME_NAME, mount_path=DATA_PVC_MOUNT_PATH ), - kubernetes.client.V1VolumeMount( - name=MODEL_VOLUME_NAME, mount_path=MODEL_PVC_MOUNT_PATH - ), - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + ] + + +def get_vol() -> list[kubernetes.client.V1Volume]: + """ + Get the volume for the SDG job. + """ + return [ + kubernetes.client.V1Volume( + name=DATA_VOLUME_NAME, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=DATA_PVC_NAME + ), ), ] @@ -681,21 +831,48 @@ def create_sdg_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ # Configureate Pod template container + exec_sdg_op_command = """ +{{exec_sdg_op_command}} +""" + exec_sdg_op_args = """ +{{exec_sdg_op_args}} +""" + + exec_huggingface_importer_op_command = """ +{{exec_huggingface_importer_op_command}} +""" + exec_huggingface_importer_op_args = """ +{{exec_huggingface_importer_op_args}} +""" + + exec_data_processing_op_command = """ +{{exec_data_processing_op_command}} +""" + exec_data_processing_op_args = """ +{{exec_data_processing_op_args}} +""" + init_containers = [ kubernetes.client.V1Container( name="sdg-op-fetch-taxonomy-data", image="{{exec_git_clone_op_image}}", command=["/bin/sh", "-c"], args={{exec_git_clone_op_args}}, - volume_mounts=get_sdg_vol_mount(), + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), kubernetes.client.V1Container( name="sdg-op-generate-synthetic-data", - image="{{exec_sdg_op_image}}", - command={{exec_sdg_op_command}}, - args={{exec_sdg_op_args}}, - volume_mounts=get_sdg_vol_mount(), + # image="{{exec_sdg_op_image}}", + image="registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.1-1724960989", + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_sdg_op_command, + python_main=exec_sdg_op_args.strip(), + ), + ], + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -709,9 +886,14 @@ def create_sdg_job( kubernetes.client.V1Container( name="huggingface-importer-op", image="{{exec_huggingface_importer_op_image}}", - command={{exec_huggingface_importer_op_command}}, - args={{exec_huggingface_importer_op_args}}, - volume_mounts=get_sdg_vol_mount(), + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_huggingface_importer_op_command, + python_main=exec_huggingface_importer_op_args.strip(), + ), + ], + volume_mounts=get_vol_mount(), security_context=get_security_context(), env_from=[ kubernetes.client.V1EnvFromSource( @@ -725,9 +907,14 @@ def create_sdg_job( kubernetes.client.V1Container( name="sdg-preprocess", image="{{exec_data_processing_op_image}}", - command={{exec_data_processing_op_command}}, - args={{exec_data_processing_op_args}}, - volume_mounts=get_sdg_vol_mount(), + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), + ), + ], + volume_mounts=get_vol_mount(), security_context=get_security_context(), ), ] @@ -749,30 +936,13 @@ def create_sdg_job( name="copy-model-to-pvc", image=TOOLBOX_IMAGE, command=["/bin/sh", "-c"], - args=[f"cp -r -v {MODEL_PVC_MOUNT_PATH} {TRAINING_PVC_MOUNT_PATH}"], - volume_mounts=get_sdg_vol_mount(), + args=[ + f"cp -r -v {DATA_PVC_MOUNT_PATH} {DATA_PVC_MOUNT_PATH}" + ], # TODO: fix me, dumb line to pass linter, this feat is unused anyway + volume_mounts=get_vol_mount(), ) - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME - ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] + volumes = get_vol() # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( @@ -805,6 +975,7 @@ def create_sdg_data_fetch_job( namespace: str, job_name: str, sdg_object_store_secret: str, + force_pull: bool = False, ) -> kubernetes.client.V1Job: """ Create a Kubernetes Job object. @@ -820,104 +991,123 @@ def create_sdg_data_fetch_job( kubernetes.client.V1Job: A Kubernetes Job object configured with the specified parameters. """ - container = kubernetes.client.V1Container( - name="fetch-sdg-files-from-object-store", - image=PYTHON_IMAGE, - command=["/bin/sh", "-c"], - args=[ - SDG_DATA_SCRIPT.format( - strategy="download", SDG_PVC_MOUNT_PATH=SDG_PVC_MOUNT_PATH - ) - ], - volume_mounts=get_sdg_vol_mount(), - env=[ - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ENDPOINT", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="endpoint", optional=True - ) + exec_data_processing_op_command = """ +{{exec_data_processing_op_command}} +""" + exec_data_processing_op_args = """ +{{exec_data_processing_op_args}} +""" + + init_containers = [ + kubernetes.client.V1Container( + name="fetch-sdg-files-from-object-store", + image=DS_IMAGE, + command=["/bin/sh", "-c"], + args=[ + DATA_SCRIPT.format( + strategy="download", + force_pull=force_pull, + data_pvc_mount_path=DATA_PVC_MOUNT_PATH, + ) + ], + volume_mounts=get_vol_mount(), + env=[ + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ENDPOINT", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="endpoint", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_BUCKET", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="bucket", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_BUCKET", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="bucket", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_ACCESS_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="access_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_ACCESS_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="access_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_SECRET_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="secret_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_SECRET_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="secret_key", + optional=False, + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_REGION", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="region", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_REGION", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="region", optional=True + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_DATA_KEY", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="data_key", optional=False - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_DATA_KEY", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, key="data_key", optional=False + ) + ), ), - ), - kubernetes.client.V1EnvVar( - name="SDG_OBJECT_STORE_VERIFY_TLS", - value_from=kubernetes.client.V1EnvVarSource( - secret_key_ref=kubernetes.client.V1SecretKeySelector( - name=sdg_object_store_secret, key="verify_tls", optional=True - ) + kubernetes.client.V1EnvVar( + name="SDG_OBJECT_STORE_VERIFY_TLS", + value_from=kubernetes.client.V1EnvVarSource( + secret_key_ref=kubernetes.client.V1SecretKeySelector( + name=sdg_object_store_secret, + key="verify_tls", + optional=True, + ) + ), ), - ), - ], - ) + ], + ) + ] - volumes = [ - kubernetes.client.V1Volume( - name=SDG_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=SDG_PVC_NAME + container = kubernetes.client.V1Container( + name="sdg-preprocess", + image=RHELAI_IMAGE, + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_data_processing_op_command, + python_main=exec_data_processing_op_args.strip(), ), - ), - kubernetes.client.V1Volume( - name=MODEL_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=MODEL_PVC_NAME + ], + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=K8S_NAME) ), - ), - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource(name=K8S_NAME) ), - ), - ] + ], + ) # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(labels={"app": "sdg-data-fetch"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", + init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -980,51 +1170,56 @@ def create_eval_job( # ), # ], # ) + + exec_run_mt_bench_op_command = """ +{{exec_run_mt_bench_op_command}} +""" + exec_run_mt_bench_op_args = """ +{{exec_run_mt_bench_op_args}} +""" + if eval_type == "mt-bench": init_containers = [ kubernetes.client.V1Container( name=f"run-eval-{eval_type}", - image="{{exec_run_mt_bench_op_image}}", - command={{exec_run_mt_bench_op_command}}, - args={{exec_run_mt_bench_op_args}}, - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH + image=RHELAI_IMAGE, + command=["/bin/sh", "-ce"], + args=[ + PYTHON_EXECUTOR.format( + python_code=exec_run_mt_bench_op_command, + python_main=exec_run_mt_bench_op_args.strip(), + ), + ], + volume_mounts=get_vol_mount(), + security_context=get_security_context(), + env_from=[ + kubernetes.client.V1EnvFromSource( + secret_ref=kubernetes.client.V1SecretEnvSource( + name=JUDGE_SERVING_NAME + ) ), ], ) ] container = kubernetes.client.V1Container( name=f"output-eval-{eval_type}-scores", - image="{{exec_run_mt_bench_op_image}}", + image=RHELAI_IMAGE, command=["/bin/sh", "-c"], args=[f"cat {MT_BENCH_SCORES_PATH}"], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=TRAINING_VOLUME_NAME, mount_path=TRAINING_PVC_MOUNT_PATH - ), - ], + security_context=get_security_context(), + volume_mounts=get_vol_mount(), ) else: raise ValueError(f"Unknown evaluation type: {eval_type}") - volumes = [ - kubernetes.client.V1Volume( - name=TRAINING_VOLUME_NAME, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=TRAINING_PVC_NAME - ), - ), - ] - # Create and configure a spec section template = kubernetes.client.V1PodTemplateSpec( - metadata=kubernetes.client.V1ObjectMeta(labels={"app": "eval"}), + metadata=kubernetes.client.V1ObjectMeta(labels={"app": f"eval-{eval_type}"}), spec=kubernetes.client.V1PodSpec( restart_policy="Never", init_containers=init_containers, containers=[container], - volumes=volumes, + volumes=get_vol(), ), ) @@ -1080,6 +1275,7 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: # Wait for the job to complete w = kubernetes.watch.Watch() + pod_log = None for event in w.stream(batch_v1.list_namespaced_job, namespace=namespace): job_event = event["object"] if job_event.metadata.name != job.metadata.name: @@ -1093,6 +1289,8 @@ def run_job(namespace: str, job: kubernetes.client.V1Job) -> str: job.spec.template.metadata.labels["app"] ), ) + # On success return the logs of the last pod which contains the output + # (useful to get eval scores) pod_log = core_v1.read_namespaced_pod_log( name=pods.items[0].metadata.name, namespace=namespace ) @@ -1205,25 +1403,11 @@ def sdg( # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", }, ] for pvc in pvcs: @@ -1231,7 +1415,7 @@ def sdg( v1.create_namespaced_persistent_volume_claim( namespace=namespace, body=create_pvc(**pvc) ) - logger.info("Successfully creayed PVC '%s' created.", pvc.get("name")) + logger.info("Successfully created PVC '%s' created.", pvc.get("name")) except kubernetes.client.rest.ApiException as exc: if exc.status == 409: logger.info("PVC '%s' already exists.", pvc["name"]) @@ -1305,6 +1489,9 @@ def sdg_data_fetch( # Populate variables from context namespace = ctx.obj["namespace"] storage_class = ctx.obj["storage_class"] + judge_serving_endpoint = ctx.obj["judge_serving_endpoint"] + judge_serving_model_name = ctx.obj["judge_serving_model_name"] + judge_serving_model_api_key = ctx.obj["judge_serving_model_api_key"] sdg_object_store_endpoint = ctx.obj["sdg_object_store_endpoint"] sdg_object_store_bucket = ctx.obj["sdg_object_store_bucket"] sdg_object_store_access_key = ctx.obj["sdg_object_store_access_key"] @@ -1313,6 +1500,10 @@ def sdg_data_fetch( sdg_object_store_data_key = ctx.obj["sdg_object_store_data_key"] sdg_object_store_verify_tls = ctx.obj["sdg_object_store_verify_tls"] sdg_object_store_secret = ctx.obj["sdg_object_store_secret"] + force_pull = ctx.obj["force_pull"] + + # Make sure the endpoint is a valid URL + validate_url(judge_serving_endpoint) # Check if all required arguments are provided if not sdg_object_store_secret: @@ -1409,28 +1600,35 @@ def sdg_data_fetch( "'bucket', 'access_key', 'secret_key', 'data_key'.", ) + # Create Secret config details for evaluation + judge_serving_details_secret = JUDGE_SERVING_NAME + secret = kubernetes.client.V1Secret( + metadata=kubernetes.client.V1ObjectMeta( + name=judge_serving_details_secret, namespace=namespace + ), + string_data={ + "JUDGE_NAME": judge_serving_model_name, + "JUDGE_API_KEY": judge_serving_model_api_key, + "JUDGE_ENDPOINT": judge_serving_endpoint, + }, + ) + + try: + v1.create_namespaced_secret(namespace=namespace, body=secret) + except kubernetes.client.rest.ApiException as exc: + if exc.status == 409: + logger.info("Secret '%s' already exists.", secret.metadata.name) + else: + raise + # list of PVCs to create and their details pvcs = [ { - "name": SDG_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "1Gi", - }, - { - "name": MODEL_PVC_NAME, - "namespace": namespace, - "storage_class": storage_class, - "access_modes": ["ReadWriteOnce"], - "size": "50Gi", - }, - { - "name": TRAINING_PVC_NAME, + "name": DATA_PVC_NAME, "namespace": namespace, "storage_class": storage_class, "access_modes": ["ReadWriteMany"], - "size": "50Gi", + "size": "200Gi", # Allocate size for a few models and large SDG data sets }, ] for pvc in pvcs: @@ -1450,6 +1648,7 @@ def sdg_data_fetch( namespace=namespace, job_name="sdg-data-fetch", sdg_object_store_secret=sdg_object_store_secret, + force_pull=force_pull, ) # Run the job @@ -1469,27 +1668,35 @@ def train( training_phase = ctx.obj["training_phase"] path_to_model = ctx.obj["model_to_train"] nproc_per_node: int = ctx.obj["nproc_per_node"] + training_1_epoch_num: int = ctx.obj["training_1_epoch_num"] + training_2_epoch_num: int = ctx.obj["training_2_epoch_num"] if training_phase is None: raise ValueError("Training phase must be provided with --training-phase=[1|2]") # During the initial training if path_to_model is None: - path_to_model = "/input_model" + path_to_model = DATA_PVC_MODEL_PATH + + epoch_num = None + if training_phase == "1": + epoch_num = training_1_epoch_num + elif training_phase == "2": + epoch_num = training_2_epoch_num logger.info("Running multi-phased distributed training phase %s", training_phase) worker_replicas = PYTORCH_NNODES - 1 pytorch_training_job_yaml = yaml.safe_load( PYTORCH_TRAINING_JOB.format( - name="train-sdg", - model_pvc_name="model", - input_pvc_name="sdg-data", - output_pvc_name="training-data", + name=f"train-phase-{training_phase}", + data_pvc_name=DATA_PVC_NAME, path_to_model=path_to_model, nproc_per_node=nproc_per_node, - PYTORCH_NNODES=PYTORCH_NNODES, - PYTORCH_IMAGE=PYTORCH_IMAGE, + nnodes=PYTORCH_NNODES, + image=RHELAI_IMAGE, worker_replicas=worker_replicas, + epoch_num=epoch_num, + phase_num=training_phase, ) ) diff --git a/training/components.py b/training/components.py index f1239d46..d178a04d 100644 --- a/training/components.py +++ b/training/components.py @@ -54,7 +54,7 @@ def data_processing(train_args: TrainingArgs) -> None: # early validation logic here if train_args.max_batch_len < train_args.max_seq_len: raise ValueError( - f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + f"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=} < {train_args.max_seq_len=}" ) # process the training data diff --git a/utils/helpers/helpers.py b/utils/helpers/helpers.py index 326fafe8..1e81f20a 100644 --- a/utils/helpers/helpers.py +++ b/utils/helpers/helpers.py @@ -51,7 +51,7 @@ def launch_vllm(model_path: str, gpu_count: int, retries: int = 60, delay: int = # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn't work -# Also, the base image does not include `pkill` cmd, so can't pkill -f vllm.entrypoints.openai.api_server either +# Also, the base image does not include 'pkill' cmd, so can't pkill -f vllm.entrypoints.openai.api_server either def stop_vllm(): import psutil