diff --git a/eval/final/components.py b/eval/final/components.py index 94124d03..1ca53e47 100644 --- a/eval/final/components.py +++ b/eval/final/components.py @@ -20,9 +20,11 @@ def run_final_eval_op( candidate_model: str = None, taxonomy_path: str = "/input/taxonomy", sdg_path: str = "/input/sdg", + use_tls: bool = False, ): import json import os + import httpx import subprocess import torch @@ -30,25 +32,11 @@ def run_final_eval_op( from instructlab.eval.mt_bench import MTBenchBranchEvaluator from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score - if judge_ca_cert := os.getenv("JUDGE_CA_CERT_PATH"): - import httpx - import openai - - # Create a custom HTTP client - class CustomHttpClient(httpx.Client): - def __init__(self, *args, **kwargs): - # Use the custom CA certificate - kwargs.setdefault("verify", judge_ca_cert) - super().__init__(*args, **kwargs) - - # Create a new OpenAI class that uses the custom HTTP client - class CustomOpenAI(openai.OpenAI): - def __init__(self, *args, **kwargs): - custom_client = CustomHttpClient() - super().__init__(http_client=custom_client, *args, **kwargs) - - # Monkey patch the OpenAI class in the openai module, so that the eval lib can use it - openai.OpenAI = CustomOpenAI + judge_api_key = os.getenv("JUDGE_API_KEY", "") + judge_model_name = os.getenv("JUDGE_NAME") + judge_endpoint = os.getenv("JUDGE_ENDPOINT") + judge_ca_cert = os.getenv("JUDGE_CA_CERT_PATH") + judge_http_client = httpx.Client(verify=judge_ca_cert) if use_tls else None print("Starting Final Eval...") @@ -408,6 +396,7 @@ def find_node_dataset_directories(base_dir: str): server_url=vllm_server, serving_gpus=gpu_count, max_workers=max_workers, + http_client=judge_http_client, ) shutdown_vllm(vllm_process) @@ -418,6 +407,7 @@ def find_node_dataset_directories(base_dir: str): api_key=judge_api_key, serving_gpus=gpu_count, max_workers=max_workers, + http_client=judge_http_client, ) qa_pairs_and_errors.append((overall_score, qa_pairs, error_rate)) diff --git a/eval/mt_bench/components.py b/eval/mt_bench/components.py index 481952d9..da791545 100644 --- a/eval/mt_bench/components.py +++ b/eval/mt_bench/components.py @@ -17,33 +17,21 @@ def run_mt_bench_op( models_folder: str, output_path: str = "/output/mt_bench_data.json", best_score_file: Optional[str] = None, + use_tls: bool = False, ) -> NamedTuple("outputs", best_model=str, best_score=float): import json import os + import httpx import subprocess import torch from instructlab.eval.mt_bench import MTBenchEvaluator - if judge_ca_cert := os.getenv("JUDGE_CA_CERT_PATH"): - import httpx - import openai - - # Create a custom HTTP client - class CustomHttpClient(httpx.Client): - def __init__(self, *args, **kwargs): - # Use the custom CA certificate - kwargs.setdefault("verify", judge_ca_cert) - super().__init__(*args, **kwargs) - - # Create a new OpenAI class that uses the custom HTTP client - class CustomOpenAI(openai.OpenAI): - def __init__(self, *args, **kwargs): - custom_client = CustomHttpClient() - super().__init__(http_client=custom_client, *args, **kwargs) - - # Monkey patch the OpenAI class in the openai module, so that the eval lib can use it - openai.OpenAI = CustomOpenAI + judge_api_key = os.getenv("JUDGE_API_KEY", "") + judge_model_name = os.getenv("JUDGE_NAME") + judge_endpoint = os.getenv("JUDGE_ENDPOINT") + judge_ca_cert = os.getenv("JUDGE_CA_CERT_PATH") + judge_http_client = httpx.Client(verify=judge_ca_cert) if use_tls else None def launch_vllm( model_path: str, gpu_count: int, retries: int = 120, delay: int = 10 @@ -136,10 +124,6 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20): models_list = os.listdir(models_folder) - judge_api_key = os.getenv("JUDGE_API_KEY", "") - judge_model_name = os.getenv("JUDGE_NAME") - judge_endpoint = os.getenv("JUDGE_ENDPOINT") - scores = {} all_mt_bench_data = [] @@ -175,6 +159,7 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20): server_url=vllm_server, serving_gpus=gpu_count, max_workers=max_workers, + http_client=judge_http_client, ) shutdown_vllm(vllm_process) @@ -184,6 +169,7 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20): api_key=judge_api_key, serving_gpus=gpu_count, max_workers=max_workers, + http_client=judge_http_client, ) mt_bench_data = { diff --git a/pipeline.py b/pipeline.py index 7f5b0e51..b64ce1dd 100644 --- a/pipeline.py +++ b/pipeline.py @@ -11,9 +11,11 @@ mount_pvc, set_image_pull_policy, use_config_map_as_env, + use_config_map_as_volume, use_secret_as_env, use_secret_as_volume, ) +import os TEACHER_CONFIG_MAP = "teacher-server" TEACHER_SECRET = "teacher-server" @@ -26,6 +28,15 @@ GENERATED_STANDALONE_FILE_NAME = "standalone.py" DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git" +# Model Serving SSL connection +SDG_CA_CERT_CM_KEY = "ca.crt" +SDG_CA_CERT_ENV_VAR_NAME = "SDG_CA_CERT_PATH" +SDG_CA_CERT_PATH = "/tmp/cert" + +JUDGE_CA_CERT_CM_KEY = "ca.crt" +JUDGE_CA_CERT_ENV_VAR_NAME = "JUDGE_CA_CERT_PATH" +JUDGE_CA_CERT_PATH = "/tmp/cert" + def ilab_pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]): """Wrapper for KFP pipeline, which allows for mocking individual stages.""" @@ -93,6 +104,8 @@ def pipeline( sdg_pipeline: str = "full", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L122 sdg_max_batch_len: int = 5000, # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L334 sdg_sample_size: float = 1.0, # FIXME: Not present in default config. Not configurable upstream at this point, capability added via https://github.com/instructlab/sdg/pull/432 + sdg_use_tls: bool = False, + # Training phase train_nproc_per_node: int = 2, # FIXME: Not present in default config. Arbitrary value chosen to demonstrate multi-node multi-gpu capabilities. Needs proper reference architecture justification. train_nnodes: int = 2, # FIXME: Not present in default config. Arbitrary value chosen to demonstrate multi-node multi-gpu capabilities. Needs proper reference architecture justification. @@ -110,11 +123,15 @@ def pipeline( # MT Bench mt_bench_max_workers: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L74 mt_bench_merge_system_user_message: bool = False, # https://github.com/instructlab/instructlab/blob/v0.21.2/src/instructlab/model/evaluate.py#L474 + mt_bench_use_tls: bool = False, + # Final evaluation final_eval_max_workers: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L74 final_eval_few_shots: int = 5, # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L56 final_eval_batch_size: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L52 final_eval_merge_system_user_message: bool = False, # https://github.com/instructlab/instructlab/blob/v0.21.2/src/instructlab/model/evaluate.py#L474 + final_eval_use_tls: bool = False, + # Other options k8s_storage_class_name: str = "standard", # FIXME: https://github.com/kubeflow/pipelines/issues/11396, https://issues.redhat.com/browse/RHOAIRFE-470 ): @@ -129,6 +146,7 @@ def pipeline( sdg_pipeline: SDG parameter. Data generation pipeline to use. Available: 'simple', 'full', or a valid path to a directory of pipeline workflow YAML files. Note that 'full' requires a larger teacher model, Mixtral-8x7b. sdg_max_batch_len: SDG parameter. Maximum tokens per gpu for each batch that will be handled in a single step. sdg_sample_size: SDG parameter. Represents the sdg skills recipe sampling size as percentage in decimal form. + sdg_use_tls: SDG parameter. Use TLS Certs (defined in the ConfigMap 'teacher-server' under key 'ca.crt') to connect to the Teacher model train_nproc_per_node: Training parameter. Number of GPUs per each node/worker to use for training. train_nnodes: Training parameter. Number of nodes/workers to train on. @@ -146,11 +164,13 @@ def pipeline( mt_bench_max_workers: MT Bench parameter. Number of workers to use for evaluation with mt_bench or mt_bench_branch. Must be a positive integer or 'auto'. mt_bench_merge_system_user_message: MT Bench parameter. Boolean indicating whether to merge system and user messages (required for Mistral based judges) + mt_bench_use_tls: MT Bench parameter. Use TLS Certs (defined in the ConfigMap 'judge-server' under key 'ca.crt') to connect to the Judge model final_eval_max_workers: Final model evaluation parameter for MT Bench Branch. Number of workers to use for evaluation with mt_bench or mt_bench_branch. Must be a positive integer or 'auto'. final_eval_few_shots: Final model evaluation parameter for MMLU. Number of question-answer pairs provided in the context preceding the question used for evaluation. final_eval_batch_size: Final model evaluation parameter for MMLU. Batch size for evaluation. Valid values are a positive integer or 'auto' to select the largest batch size that will fit in memory. final_eval_merge_system_user_message: Final model evaluation parameter for MT Bench Branch. Boolean indicating whether to merge system and user messages (required for Mistral based judges) + mt_bench_use_tls: Final model evaluation parameter. Use TLS Certs (defined in the ConfigMap 'judge-server' under key 'ca.crt') to connect to the Judge model k8s_storage_class_name: A Kubernetes StorageClass name for persistent volumes. Selected StorageClass must support RWX PersistentVolumes. """ @@ -180,6 +200,7 @@ def pipeline( repo_branch=sdg_repo_branch, repo_pr=sdg_repo_pr, sdg_sampling_size=sdg_sample_size, + use_tls=sdg_use_tls, ) sdg_task.set_env_variable("HOME", "/tmp") sdg_task.set_env_variable("HF_HOME", "/tmp") @@ -187,6 +208,9 @@ def pipeline( sdg_task, TEACHER_CONFIG_MAP, dict(endpoint="endpoint", model="model") ) use_secret_as_env(sdg_task, TEACHER_SECRET, {"api_key": "api_key"}) + use_config_map_as_volume(sdg_task, TEACHER_CONFIG_MAP, mount_path=SDG_CA_CERT_PATH) + sdg_task.set_env_variable(SDG_CA_CERT_ENV_VAR_NAME, os.path.join(SDG_CA_CERT_PATH, SDG_CA_CERT_CM_KEY)) + sdg_task.after(git_clone_task) mount_pvc( task=sdg_task, @@ -330,6 +354,7 @@ def pipeline( models_folder="/output/phase_2/model/hf_format", max_workers=mt_bench_max_workers, merge_system_user_message=mt_bench_merge_system_user_message, + use_tls=mt_bench_use_tls, ) mount_pvc( task=run_mt_bench_task, @@ -349,6 +374,9 @@ def pipeline( ) use_secret_as_env(run_mt_bench_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"}) + use_config_map_as_volume(run_mt_bench_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH) + run_mt_bench_task.set_env_variable(JUDGE_CA_CERT_ENV_VAR_NAME, os.path.join(JUDGE_CA_CERT_PATH, JUDGE_CA_CERT_CM_KEY)) + # uncomment if updating image with same tag # set_image_pull_policy(run_mt_bench_task, "Always") @@ -362,6 +390,7 @@ def pipeline( merge_system_user_message=final_eval_merge_system_user_message, few_shots=final_eval_few_shots, batch_size=final_eval_batch_size, + use_tls=final_eval_use_tls, ) mount_pvc( task=final_eval_task, pvc_name=output_pvc_task.output, mount_path="/output" @@ -391,6 +420,9 @@ def pipeline( use_secret_as_env(final_eval_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"}) + use_config_map_as_volume(final_eval_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH) + final_eval_task.set_env_variable(JUDGE_CA_CERT_ENV_VAR_NAME, os.path.join(JUDGE_CA_CERT_PATH, JUDGE_CA_CERT_CM_KEY)) + final_eval_task.after(run_mt_bench_task) final_eval_task.set_accelerator_type("nvidia.com/gpu") final_eval_task.set_accelerator_limit(1) @@ -592,10 +624,10 @@ def gen_standalone(): # The list of executor names to extract details from to generate the standalone script executors = { "exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg_path="{DATA_PVC_SDG_PATH}", model_path="{DATA_PVC_MODEL_PATH}", skills_path="{PREPROCESSED_DATA_SKILLS_PATH}", knowledge_path="{PREPROCESSED_DATA_KNOWLEDGE_PATH}")', - "exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, pipeline="{sdg_pipeline}", repo_branch="{exec_git_clone_op_repo_branch or ""}", repo_pr={exec_git_clone_op_repo_pr or 0}, taxonomy_path="{TAXONOMY_DATA_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", sdg_sampling_size={sdg_sampling_size})', + "exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, pipeline="{sdg_pipeline}", repo_branch="{exec_git_clone_op_repo_branch or ""}", repo_pr={exec_git_clone_op_repo_pr or 0}, taxonomy_path="{TAXONOMY_DATA_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", sdg_sampling_size={sdg_sampling_size}, use_tls={sdg_use_tls})', "exec-git-clone-op": {}, - "exec-run-mt-bench-op": 'run_mt_bench_op(best_score_file="{MT_BENCH_SCORES_PATH}",output_path="{MT_BENCH_OUTPUT_PATH}",models_folder="{CANDIDATE_MODEL_PATH_PREFIX}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE})', - "exec-run-final-eval-op": 'run_final_eval_op(mmlu_branch_output="{MMLU_BRANCH_SCORES_PATH}", mt_bench_branch_output="{MT_BENCH_BRANCH_SCORES_PATH}", candidate_model="{CANDIDATE_MODEL_PATH}", taxonomy_path="{TAXONOMY_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", base_branch="", candidate_branch="", base_model_dir="{DATA_PVC_MODEL_PATH}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, few_shots={FEW_SHOTS}, batch_size="{BATCH_SIZE}")', + "exec-run-mt-bench-op": 'run_mt_bench_op(best_score_file="{MT_BENCH_SCORES_PATH}",output_path="{MT_BENCH_OUTPUT_PATH}",models_folder="{CANDIDATE_MODEL_PATH_PREFIX}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, use_tls={mt_bench_use_tls})', + "exec-run-final-eval-op": 'run_final_eval_op(mmlu_branch_output="{MMLU_BRANCH_SCORES_PATH}", mt_bench_branch_output="{MT_BENCH_BRANCH_SCORES_PATH}", candidate_model="{CANDIDATE_MODEL_PATH}", taxonomy_path="{TAXONOMY_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", base_branch="", candidate_branch="", base_model_dir="{DATA_PVC_MODEL_PATH}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, few_shots={FEW_SHOTS}, batch_size="{BATCH_SIZE}", use_tls={final_eval_use_tls})', } details = {} diff --git a/pipeline.yaml b/pipeline.yaml index bdcc742d..47e8f289 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -6,9 +6,11 @@ # final_eval_few_shots: int [Default: 5.0] # final_eval_max_workers: str [Default: 'auto'] # final_eval_merge_system_user_message: bool [Default: False] +# final_eval_use_tls: bool [Default: False] # k8s_storage_class_name: str [Default: 'standard'] # mt_bench_max_workers: str [Default: 'auto'] # mt_bench_merge_system_user_message: bool [Default: False] +# mt_bench_use_tls: bool [Default: False] # sdg_base_model: str [Default: 's3:///'] # sdg_max_batch_len: int [Default: 5000.0] # sdg_pipeline: str [Default: 'full'] @@ -17,6 +19,7 @@ # sdg_repo_url: str [Default: 'https://github.com/instructlab/taxonomy.git'] # sdg_sample_size: float [Default: 1.0] # sdg_scale_factor: int [Default: 30.0] +# sdg_use_tls: bool [Default: False] # train_effective_batch_size_phase_1: int [Default: 128.0] # train_effective_batch_size_phase_2: int [Default: 3840.0] # train_learning_rate_phase_1: float [Default: 2e-05] @@ -472,6 +475,10 @@ components: defaultValue: /input/taxonomy isOptional: true parameterType: STRING + use_tls: + defaultValue: false + isOptional: true + parameterType: BOOLEAN outputDefinitions: artifacts: mmlu_branch_output: @@ -499,6 +506,10 @@ components: defaultValue: /output/mt_bench_data.json isOptional: true parameterType: STRING + use_tls: + defaultValue: false + isOptional: true + parameterType: BOOLEAN outputDefinitions: parameters: best_model: @@ -529,6 +540,10 @@ components: defaultValue: /data/taxonomy isOptional: true parameterType: STRING + use_tls: + defaultValue: false + isOptional: true + parameterType: BOOLEAN comp-sdg-to-artifact-op: executorLabel: exec-sdg-to-artifact-op inputDefinitions: @@ -1143,25 +1158,19 @@ deploymentSpec: \ base_branch: str,\n candidate_branch: str,\n max_workers: str,\n\ \ few_shots: int,\n batch_size: str,\n merge_system_user_message:\ \ bool,\n candidate_model: str = None,\n taxonomy_path: str = \"/input/taxonomy\"\ - ,\n sdg_path: str = \"/input/sdg\",\n):\n import json\n import\ - \ os\n import subprocess\n\n import torch\n from instructlab.eval.mmlu\ - \ import MMLUBranchEvaluator\n from instructlab.eval.mt_bench import\ - \ MTBenchBranchEvaluator\n from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores,\ - \ sort_score\n\n if judge_ca_cert := os.getenv(\"JUDGE_CA_CERT_PATH\"\ - ):\n import httpx\n import openai\n\n # Create a custom\ - \ HTTP client\n class CustomHttpClient(httpx.Client):\n \ - \ def __init__(self, *args, **kwargs):\n # Use the custom\ - \ CA certificate\n kwargs.setdefault(\"verify\", judge_ca_cert)\n\ - \ super().__init__(*args, **kwargs)\n\n # Create a\ - \ new OpenAI class that uses the custom HTTP client\n class CustomOpenAI(openai.OpenAI):\n\ - \ def __init__(self, *args, **kwargs):\n custom_client\ - \ = CustomHttpClient()\n super().__init__(http_client=custom_client,\ - \ *args, **kwargs)\n\n # Monkey patch the OpenAI class in the openai\ - \ module, so that the eval lib can use it\n openai.OpenAI = CustomOpenAI\n\ - \n print(\"Starting Final Eval...\")\n\n def launch_vllm(\n \ - \ model_path: str, gpu_count: int, retries: int = 120, delay: int = 10\n\ - \ ) -> tuple:\n import subprocess\n import sys\n \ - \ import time\n\n import requests\n from instructlab.model.backends.common\ + ,\n sdg_path: str = \"/input/sdg\",\n use_tls: bool = False,\n):\n\ + \ import json\n import os\n import httpx\n import subprocess\n\ + \n import torch\n from instructlab.eval.mmlu import MMLUBranchEvaluator\n\ + \ from instructlab.eval.mt_bench import MTBenchBranchEvaluator\n from\ + \ instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score\n\ + \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ + \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ + )\n judge_ca_cert = os.getenv(\"JUDGE_CA_CERT_PATH\")\n judge_http_client\ + \ = httpx.Client(verify=judge_ca_cert) if use_tls else None\n\n print(\"\ + Starting Final Eval...\")\n\n def launch_vllm(\n model_path: str,\ + \ gpu_count: int, retries: int = 120, delay: int = 10\n ) -> tuple:\n\ + \ import subprocess\n import sys\n import time\n\n\ + \ import requests\n from instructlab.model.backends.common\ \ import free_tcp_ipv4_port\n\n free_port = free_tcp_ipv4_port(\"\ 127.0.0.1\")\n port = str(free_port)\n vllm_server = f\"http://127.0.0.1:{port}/v1\"\ \n\n command = [\n sys.executable,\n \"-m\"\ @@ -1335,11 +1344,12 @@ deploymentSpec: \ for branch {branch}...\"\n )\n vllm_process, vllm_server\ \ = launch_vllm(m_path, gpu_count)\n\n evaluator.gen_answers(\n \ \ server_url=vllm_server,\n serving_gpus=gpu_count,\n\ - \ max_workers=max_workers,\n )\n\n shutdown_vllm(vllm_process)\n\ - \n print(f\"Evaluating answers for branch {branch}...\")\n \ - \ overall_score, qa_pairs, error_rate = evaluator.judge_answers(\n \ - \ server_url=judge_endpoint,\n api_key=judge_api_key,\n\ - \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ + \ max_workers=max_workers,\n http_client=judge_http_client,\n\ + \ )\n\n shutdown_vllm(vllm_process)\n\n print(f\"Evaluating\ + \ answers for branch {branch}...\")\n overall_score, qa_pairs, error_rate\ + \ = evaluator.judge_answers(\n server_url=judge_endpoint,\n \ + \ api_key=judge_api_key,\n serving_gpus=gpu_count,\n\ + \ max_workers=max_workers,\n http_client=judge_http_client,\n\ \ )\n\n qa_pairs_and_errors.append((overall_score, qa_pairs,\ \ error_rate))\n\n overall_score, qa_pairs, error_rate = qa_pairs_and_errors[0]\n\ \ base_overall_score, base_qa_pairs, base_error_rate = qa_pairs_and_errors[1]\n\ @@ -1373,6 +1383,8 @@ deploymentSpec: value: /tmp - name: HF_HOME value: /tmp + - name: JUDGE_CA_CERT_PATH + value: /tmp/cert/ca.crt image: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1 resources: accelerator: @@ -1403,40 +1415,33 @@ deploymentSpec: \ calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ \ max_workers: str,\n models_folder: str,\n output_path: str =\ \ \"/output/mt_bench_data.json\",\n best_score_file: Optional[str] =\ - \ None,\n) -> NamedTuple(\"outputs\", best_model=str, best_score=float):\n\ - \ import json\n import os\n import subprocess\n\n import torch\n\ - \ from instructlab.eval.mt_bench import MTBenchEvaluator\n\n if judge_ca_cert\ - \ := os.getenv(\"JUDGE_CA_CERT_PATH\"):\n import httpx\n import\ - \ openai\n\n # Create a custom HTTP client\n class CustomHttpClient(httpx.Client):\n\ - \ def __init__(self, *args, **kwargs):\n # Use\ - \ the custom CA certificate\n kwargs.setdefault(\"verify\"\ - , judge_ca_cert)\n super().__init__(*args, **kwargs)\n\n\ - \ # Create a new OpenAI class that uses the custom HTTP client\n\ - \ class CustomOpenAI(openai.OpenAI):\n def __init__(self,\ - \ *args, **kwargs):\n custom_client = CustomHttpClient()\n\ - \ super().__init__(http_client=custom_client, *args, **kwargs)\n\ - \n # Monkey patch the OpenAI class in the openai module, so that\ - \ the eval lib can use it\n openai.OpenAI = CustomOpenAI\n\n def\ - \ launch_vllm(\n model_path: str, gpu_count: int, retries: int =\ - \ 120, delay: int = 10\n ) -> tuple:\n import subprocess\n \ - \ import sys\n import time\n\n import requests\n \ - \ from instructlab.model.backends.common import free_tcp_ipv4_port\n\n\ - \ free_port = free_tcp_ipv4_port(\"127.0.0.1\")\n port = str(free_port)\n\ - \ vllm_server = f\"http://127.0.0.1:{port}/v1\"\n\n command\ - \ = [\n sys.executable,\n \"-m\",\n \"\ - vllm.entrypoints.openai.api_server\",\n \"--port\",\n \ - \ port,\n \"--model\",\n model_path,\n \ - \ ]\n if gpu_count > 0:\n command += [\n \ - \ \"--tensor-parallel-size\",\n str(gpu_count),\n \ - \ ]\n\n process = subprocess.Popen(args=command)\n\n \ - \ print(f\"Waiting for vLLM server to start at {vllm_server}...\")\n\n\ - \ for attempt in range(retries):\n try:\n \ - \ response = requests.get(f\"{vllm_server}/models\")\n \ - \ if response.status_code == 200:\n print(f\"vLLM server\ - \ is up and running at {vllm_server}.\")\n return process,\ - \ vllm_server\n except requests.ConnectionError:\n \ - \ pass\n\n print(\n f\"Server not available\ - \ yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\ + \ None,\n use_tls: bool = False,\n) -> NamedTuple(\"outputs\", best_model=str,\ + \ best_score=float):\n import json\n import os\n import httpx\n\ + \ import subprocess\n\n import torch\n from instructlab.eval.mt_bench\ + \ import MTBenchEvaluator\n\n judge_api_key = os.getenv(\"JUDGE_API_KEY\"\ + , \"\")\n judge_model_name = os.getenv(\"JUDGE_NAME\")\n judge_endpoint\ + \ = os.getenv(\"JUDGE_ENDPOINT\")\n judge_ca_cert = os.getenv(\"JUDGE_CA_CERT_PATH\"\ + )\n judge_http_client = httpx.Client(verify=judge_ca_cert) if use_tls\ + \ else None\n\n def launch_vllm(\n model_path: str, gpu_count:\ + \ int, retries: int = 120, delay: int = 10\n ) -> tuple:\n import\ + \ subprocess\n import sys\n import time\n\n import\ + \ requests\n from instructlab.model.backends.common import free_tcp_ipv4_port\n\ + \n free_port = free_tcp_ipv4_port(\"127.0.0.1\")\n port =\ + \ str(free_port)\n vllm_server = f\"http://127.0.0.1:{port}/v1\"\n\ + \n command = [\n sys.executable,\n \"-m\",\n\ + \ \"vllm.entrypoints.openai.api_server\",\n \"--port\"\ + ,\n port,\n \"--model\",\n model_path,\n\ + \ ]\n if gpu_count > 0:\n command += [\n \ + \ \"--tensor-parallel-size\",\n str(gpu_count),\n\ + \ ]\n\n process = subprocess.Popen(args=command)\n\n \ + \ print(f\"Waiting for vLLM server to start at {vllm_server}...\"\ + )\n\n for attempt in range(retries):\n try:\n \ + \ response = requests.get(f\"{vllm_server}/models\")\n \ + \ if response.status_code == 200:\n print(f\"vLLM\ + \ server is up and running at {vllm_server}.\")\n return\ + \ process, vllm_server\n except requests.ConnectionError:\n \ + \ pass\n\n print(\n f\"Server not\ + \ available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\ \n )\n time.sleep(delay)\n\n raise RuntimeError(\n\ \ f\"Failed to start vLLM server at {vllm_server} after {retries}\ \ retries.\"\n )\n\n def shutdown_vllm(process: subprocess.Popen,\ @@ -1460,9 +1465,7 @@ deploymentSpec: \ gpu_available\n else \"No GPU available\"\n )\n gpu_count\ \ = torch.cuda.device_count() if gpu_available else 0\n\n print(f\"GPU\ \ Available: {gpu_available}, {gpu_name}\")\n\n models_list = os.listdir(models_folder)\n\ - \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ - \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ - )\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment\ + \n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment\ \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ \ number of gpus allocated for serving is calculated based on environment\n\ \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ @@ -1480,25 +1483,25 @@ deploymentSpec: \ output_dir=\"/tmp/eval_output\",\n merge_system_user_message=merge_system_user_message,\n\ \ )\n\n evaluator.gen_answers(\n server_url=vllm_server,\n\ \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ - \ )\n\n shutdown_vllm(vllm_process)\n\n overall_score,\ - \ qa_pairs, turn_scores, error_rate = evaluator.judge_answers(\n \ - \ server_url=judge_endpoint,\n api_key=judge_api_key,\n \ - \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ - \ )\n\n mt_bench_data = {\n \"report_title\": \"\ - SKILLS EVALUATION REPORT\",\n \"model\": model_path,\n \ - \ \"judge_model\": judge_model_name,\n \"overall_score\"\ - : overall_score,\n \"turn_scores\": turn_scores,\n \ - \ \"qa_scores\": qa_pairs,\n \"error_rate\": error_rate,\n \ - \ }\n\n all_mt_bench_data.append(mt_bench_data)\n scores[model_path]\ - \ = overall_score\n\n with open(output_path, \"w\", encoding=\"utf-8\"\ - ) as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs\ - \ = NamedTuple(\"outputs\", best_model=str, best_score=float)\n best_model\ - \ = max(scores, key=scores.get)\n best_score = scores[best_model]\n \ - \ if best_score_file:\n with open(best_score_file, \"w\", encoding=\"\ - utf-8\") as f:\n json.dump({\"best_model\": best_model, \"best_score\"\ - : best_score}, f, indent=4)\n\n # Rename the best model directory to\ - \ \"candidate_model\" for the next step\n # So we know which model to\ - \ use for the final evaluation\n if os.path.exists(os.path.join(models_folder,\ + \ http_client=judge_http_client,\n )\n\n shutdown_vllm(vllm_process)\n\ + \n overall_score, qa_pairs, turn_scores, error_rate = evaluator.judge_answers(\n\ + \ server_url=judge_endpoint,\n api_key=judge_api_key,\n\ + \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ + \ http_client=judge_http_client,\n )\n\n mt_bench_data\ + \ = {\n \"report_title\": \"SKILLS EVALUATION REPORT\",\n \ + \ \"model\": model_path,\n \"judge_model\": judge_model_name,\n\ + \ \"overall_score\": overall_score,\n \"turn_scores\"\ + : turn_scores,\n \"qa_scores\": qa_pairs,\n \"error_rate\"\ + : error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n\ + \ scores[model_path] = overall_score\n\n with open(output_path,\ + \ \"w\", encoding=\"utf-8\") as f:\n json.dump(all_mt_bench_data,\ + \ f, indent=4)\n\n outputs = NamedTuple(\"outputs\", best_model=str,\ + \ best_score=float)\n best_model = max(scores, key=scores.get)\n best_score\ + \ = scores[best_model]\n if best_score_file:\n with open(best_score_file,\ + \ \"w\", encoding=\"utf-8\") as f:\n json.dump({\"best_model\"\ + : best_model, \"best_score\": best_score}, f, indent=4)\n\n # Rename\ + \ the best model directory to \"candidate_model\" for the next step\n \ + \ # So we know which model to use for the final evaluation\n if os.path.exists(os.path.join(models_folder,\ \ \"candidate_model\")):\n print(\"candidate_model already exists.\ \ Skipping renaming\")\n else:\n os.rename(\n os.path.join(models_folder,\ \ best_model),\n os.path.join(models_folder, \"candidate_model\"\ @@ -1509,6 +1512,8 @@ deploymentSpec: value: /tmp - name: HF_HOME value: /tmp + - name: JUDGE_CA_CERT_PATH + value: /tmp/cert/ca.crt image: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1 resources: accelerator: @@ -1536,12 +1541,13 @@ deploymentSpec: \ *\n\ndef sdg_op(\n num_instructions_to_generate: int,\n pipeline:\ \ str,\n repo_branch: Optional[str],\n repo_pr: Optional[int],\n \ \ taxonomy_path: str = \"/data/taxonomy\",\n sdg_path: str = \"/data/sdg\"\ - ,\n sdg_sampling_size: float = 1.0,\n):\n from os import getenv, path\n\ - \n import instructlab.sdg\n import openai\n import yaml\n\n \ - \ api_key = getenv(\"api_key\")\n model = getenv(\"model\")\n endpoint\ - \ = getenv(\"endpoint\")\n\n if sdg_ca_cert := getenv(\"SDG_CA_CERT_PATH\"\ - ):\n import httpx\n\n custom_http_client = httpx.Client(verify=sdg_ca_cert)\n\ - \ client = openai.OpenAI(\n base_url=endpoint, api_key=api_key,\ + ,\n sdg_sampling_size: float = 1.0,\n use_tls: bool = False,\n):\n\ + \ from os import getenv, path\n\n import instructlab.sdg\n import\ + \ openai\n import yaml\n\n api_key = getenv(\"api_key\")\n model\ + \ = getenv(\"model\")\n endpoint = getenv(\"endpoint\")\n\n if use_tls:\n\ + \ import httpx\n\n sdg_ca_cert = getenv(\"SDG_CA_CERT_PATH\"\ + )\n custom_http_client = httpx.Client(verify=sdg_ca_cert)\n \ + \ client = openai.OpenAI(\n base_url=endpoint, api_key=api_key,\ \ http_client=custom_http_client\n )\n else:\n client =\ \ openai.OpenAI(base_url=endpoint, api_key=api_key)\n\n taxonomy_base\ \ = \"main\" if repo_branch or (repo_pr and int(repo_pr) > 0) else \"empty\"\ @@ -1619,6 +1625,8 @@ deploymentSpec: value: /tmp - name: HF_HOME value: /tmp + - name: SDG_CA_CERT_PATH + value: /tmp/cert/ca.crt image: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1 exec-sdg-to-artifact-op: container: @@ -1989,6 +1997,8 @@ root: componentInputParameter: final_eval_max_workers merge_system_user_message: componentInputParameter: final_eval_merge_system_user_message + use_tls: + componentInputParameter: final_eval_use_tls taskInfo: name: run-final-eval-op run-mt-bench-op: @@ -2007,6 +2017,8 @@ root: models_folder: runtimeValue: constant: /output/phase_2/model/hf_format + use_tls: + componentInputParameter: mt_bench_use_tls taskInfo: name: run-mt-bench-op sdg-op: @@ -2028,6 +2040,8 @@ root: componentInputParameter: sdg_repo_pr sdg_sampling_size: componentInputParameter: sdg_sample_size + use_tls: + componentInputParameter: sdg_use_tls taskInfo: name: sdg-op sdg-to-artifact-op: @@ -2090,6 +2104,10 @@ root: based judges) isOptional: true parameterType: BOOLEAN + final_eval_use_tls: + defaultValue: false + isOptional: true + parameterType: BOOLEAN k8s_storage_class_name: defaultValue: standard description: A Kubernetes StorageClass name for persistent volumes. Selected @@ -2108,6 +2126,13 @@ root: and user messages (required for Mistral based judges) isOptional: true parameterType: BOOLEAN + mt_bench_use_tls: + defaultValue: false + description: Final model evaluation parameter. Use TLS Certs (defined in + the ConfigMap 'teacher-server' under key 'ca.crt') to connect to the Judge + model + isOptional: true + parameterType: BOOLEAN sdg_base_model: defaultValue: s3:/// description: SDG parameter. LLM model used to generate the synthetic dataset @@ -2152,6 +2177,12 @@ root: description: SDG parameter. The total number of instructions to be generated. isOptional: true parameterType: NUMBER_INTEGER + sdg_use_tls: + defaultValue: false + description: SDG parameter. Use TLS Certs (defined in the ConfigMap 'teacher-server' + under key 'ca.crt') to connect to the Teacher model + isOptional: true + parameterType: BOOLEAN train_effective_batch_size_phase_1: defaultValue: 128.0 description: Training parameter for in Phase 1. The number of samples in a @@ -2293,6 +2324,10 @@ platforms: envVar: JUDGE_ENDPOINT - configMapKey: model envVar: JUDGE_NAME + configMapAsVolume: + - configMapName: judge-server + mountPath: /tmp/cert + optional: false pvcMount: - mountPath: /output taskOutputParameter: @@ -2319,6 +2354,10 @@ platforms: envVar: JUDGE_ENDPOINT - configMapKey: model envVar: JUDGE_NAME + configMapAsVolume: + - configMapName: judge-server + mountPath: /tmp/cert + optional: false pvcMount: - mountPath: /output taskOutputParameter: @@ -2337,6 +2376,10 @@ platforms: envVar: endpoint - configMapKey: model envVar: model + configMapAsVolume: + - configMapName: teacher-server + mountPath: /tmp/cert + optional: false pvcMount: - mountPath: /data taskOutputParameter: diff --git a/sdg/components.py b/sdg/components.py index e3370e67..e248b86f 100644 --- a/sdg/components.py +++ b/sdg/components.py @@ -36,6 +36,7 @@ def sdg_op( taxonomy_path: str = "/data/taxonomy", sdg_path: str = "/data/sdg", sdg_sampling_size: float = 1.0, + use_tls: bool = False, ): from os import getenv, path @@ -47,9 +48,10 @@ def sdg_op( model = getenv("model") endpoint = getenv("endpoint") - if sdg_ca_cert := getenv("SDG_CA_CERT_PATH"): + if use_tls: import httpx + sdg_ca_cert = getenv("SDG_CA_CERT_PATH") custom_http_client = httpx.Client(verify=sdg_ca_cert) client = openai.OpenAI( base_url=endpoint, api_key=api_key, http_client=custom_http_client