Skip to content

Commit

Permalink
Parameterize TLS Certs for SDG and Eval model connections
Browse files Browse the repository at this point in the history
- Add _use_tls switch params for sdg, mt_bench, and final_eval phases
- Update http client generation for eval steps to create httpx instead
  of patching the OpenAI class with a custom class

Signed-off-by: Giulio Frasca <[email protected]>
  • Loading branch information
gmfrasca committed Dec 20, 2024
1 parent cd8bce7 commit 5be3f90
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 132 deletions.
28 changes: 9 additions & 19 deletions eval/final/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,23 @@ def run_final_eval_op(
candidate_model: str = None,
taxonomy_path: str = "/input/taxonomy",
sdg_path: str = "/input/sdg",
use_tls: bool = False,
):
import json
import os
import httpx
import subprocess

import torch
from instructlab.eval.mmlu import MMLUBranchEvaluator
from instructlab.eval.mt_bench import MTBenchBranchEvaluator
from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score

if judge_ca_cert := os.getenv("JUDGE_CA_CERT_PATH"):
import httpx
import openai

# Create a custom HTTP client
class CustomHttpClient(httpx.Client):
def __init__(self, *args, **kwargs):
# Use the custom CA certificate
kwargs.setdefault("verify", judge_ca_cert)
super().__init__(*args, **kwargs)

# Create a new OpenAI class that uses the custom HTTP client
class CustomOpenAI(openai.OpenAI):
def __init__(self, *args, **kwargs):
custom_client = CustomHttpClient()
super().__init__(http_client=custom_client, *args, **kwargs)

# Monkey patch the OpenAI class in the openai module, so that the eval lib can use it
openai.OpenAI = CustomOpenAI
judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
judge_ca_cert = os.getenv("JUDGE_CA_CERT_PATH")
judge_http_client = httpx.Client(verify=judge_ca_cert) if use_tls else None

print("Starting Final Eval...")

Expand Down Expand Up @@ -408,6 +396,7 @@ def find_node_dataset_directories(base_dir: str):
server_url=vllm_server,
serving_gpus=gpu_count,
max_workers=max_workers,
http_client=judge_http_client,
)

shutdown_vllm(vllm_process)
Expand All @@ -418,6 +407,7 @@ def find_node_dataset_directories(base_dir: str):
api_key=judge_api_key,
serving_gpus=gpu_count,
max_workers=max_workers,
http_client=judge_http_client,
)

qa_pairs_and_errors.append((overall_score, qa_pairs, error_rate))
Expand Down
32 changes: 9 additions & 23 deletions eval/mt_bench/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,21 @@ def run_mt_bench_op(
models_folder: str,
output_path: str = "/output/mt_bench_data.json",
best_score_file: Optional[str] = None,
use_tls: bool = False,
) -> NamedTuple("outputs", best_model=str, best_score=float):
import json
import os
import httpx
import subprocess

import torch
from instructlab.eval.mt_bench import MTBenchEvaluator

if judge_ca_cert := os.getenv("JUDGE_CA_CERT_PATH"):
import httpx
import openai

# Create a custom HTTP client
class CustomHttpClient(httpx.Client):
def __init__(self, *args, **kwargs):
# Use the custom CA certificate
kwargs.setdefault("verify", judge_ca_cert)
super().__init__(*args, **kwargs)

# Create a new OpenAI class that uses the custom HTTP client
class CustomOpenAI(openai.OpenAI):
def __init__(self, *args, **kwargs):
custom_client = CustomHttpClient()
super().__init__(http_client=custom_client, *args, **kwargs)

# Monkey patch the OpenAI class in the openai module, so that the eval lib can use it
openai.OpenAI = CustomOpenAI
judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
judge_ca_cert = os.getenv("JUDGE_CA_CERT_PATH")
judge_http_client = httpx.Client(verify=judge_ca_cert) if use_tls else None

def launch_vllm(
model_path: str, gpu_count: int, retries: int = 120, delay: int = 10
Expand Down Expand Up @@ -136,10 +124,6 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20):

models_list = os.listdir(models_folder)

judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")

scores = {}
all_mt_bench_data = []

Expand Down Expand Up @@ -175,6 +159,7 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20):
server_url=vllm_server,
serving_gpus=gpu_count,
max_workers=max_workers,
http_client=judge_http_client,
)

shutdown_vllm(vllm_process)
Expand All @@ -184,6 +169,7 @@ def shutdown_vllm(process: subprocess.Popen, timeout: int = 20):
api_key=judge_api_key,
serving_gpus=gpu_count,
max_workers=max_workers,
http_client=judge_http_client,
)

mt_bench_data = {
Expand Down
38 changes: 35 additions & 3 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
mount_pvc,
set_image_pull_policy,
use_config_map_as_env,
use_config_map_as_volume,
use_secret_as_env,
use_secret_as_volume,
)
import os

TEACHER_CONFIG_MAP = "teacher-server"
TEACHER_SECRET = "teacher-server"
Expand All @@ -26,6 +28,15 @@
GENERATED_STANDALONE_FILE_NAME = "standalone.py"
DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git"

# Model Serving SSL connection
SDG_CA_CERT_CM_KEY = "ca.crt"
SDG_CA_CERT_ENV_VAR_NAME = "SDG_CA_CERT_PATH"
SDG_CA_CERT_PATH = "/tmp/cert"

JUDGE_CA_CERT_CM_KEY = "ca.crt"
JUDGE_CA_CERT_ENV_VAR_NAME = "JUDGE_CA_CERT_PATH"
JUDGE_CA_CERT_PATH = "/tmp/cert"


def ilab_pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]):
"""Wrapper for KFP pipeline, which allows for mocking individual stages."""
Expand Down Expand Up @@ -93,6 +104,8 @@ def pipeline(
sdg_pipeline: str = "full", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L122
sdg_max_batch_len: int = 5000, # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L334
sdg_sample_size: float = 1.0, # FIXME: Not present in default config. Not configurable upstream at this point, capability added via https://github.com/instructlab/sdg/pull/432
sdg_use_tls: bool = False,

# Training phase
train_nproc_per_node: int = 2, # FIXME: Not present in default config. Arbitrary value chosen to demonstrate multi-node multi-gpu capabilities. Needs proper reference architecture justification.
train_nnodes: int = 2, # FIXME: Not present in default config. Arbitrary value chosen to demonstrate multi-node multi-gpu capabilities. Needs proper reference architecture justification.
Expand All @@ -110,11 +123,15 @@ def pipeline(
# MT Bench
mt_bench_max_workers: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L74
mt_bench_merge_system_user_message: bool = False, # https://github.com/instructlab/instructlab/blob/v0.21.2/src/instructlab/model/evaluate.py#L474
mt_bench_use_tls: bool = False,

# Final evaluation
final_eval_max_workers: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L74
final_eval_few_shots: int = 5, # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L56
final_eval_batch_size: str = "auto", # https://github.com/instructlab/instructlab/blob/v0.21.2/tests/testdata/default_config.yaml#L52
final_eval_merge_system_user_message: bool = False, # https://github.com/instructlab/instructlab/blob/v0.21.2/src/instructlab/model/evaluate.py#L474
final_eval_use_tls: bool = False,

# Other options
k8s_storage_class_name: str = "standard", # FIXME: https://github.com/kubeflow/pipelines/issues/11396, https://issues.redhat.com/browse/RHOAIRFE-470
):
Expand All @@ -129,6 +146,7 @@ def pipeline(
sdg_pipeline: SDG parameter. Data generation pipeline to use. Available: 'simple', 'full', or a valid path to a directory of pipeline workflow YAML files. Note that 'full' requires a larger teacher model, Mixtral-8x7b.
sdg_max_batch_len: SDG parameter. Maximum tokens per gpu for each batch that will be handled in a single step.
sdg_sample_size: SDG parameter. Represents the sdg skills recipe sampling size as percentage in decimal form.
sdg_use_tls: SDG parameter. Use TLS Certs (defined in the ConfigMap 'teacher-server' under key 'ca.crt') to connect to the Teacher model
train_nproc_per_node: Training parameter. Number of GPUs per each node/worker to use for training.
train_nnodes: Training parameter. Number of nodes/workers to train on.
Expand All @@ -146,11 +164,13 @@ def pipeline(
mt_bench_max_workers: MT Bench parameter. Number of workers to use for evaluation with mt_bench or mt_bench_branch. Must be a positive integer or 'auto'.
mt_bench_merge_system_user_message: MT Bench parameter. Boolean indicating whether to merge system and user messages (required for Mistral based judges)
mt_bench_use_tls: MT Bench parameter. Use TLS Certs (defined in the ConfigMap 'judge-server' under key 'ca.crt') to connect to the Judge model
final_eval_max_workers: Final model evaluation parameter for MT Bench Branch. Number of workers to use for evaluation with mt_bench or mt_bench_branch. Must be a positive integer or 'auto'.
final_eval_few_shots: Final model evaluation parameter for MMLU. Number of question-answer pairs provided in the context preceding the question used for evaluation.
final_eval_batch_size: Final model evaluation parameter for MMLU. Batch size for evaluation. Valid values are a positive integer or 'auto' to select the largest batch size that will fit in memory.
final_eval_merge_system_user_message: Final model evaluation parameter for MT Bench Branch. Boolean indicating whether to merge system and user messages (required for Mistral based judges)
mt_bench_use_tls: Final model evaluation parameter. Use TLS Certs (defined in the ConfigMap 'judge-server' under key 'ca.crt') to connect to the Judge model
k8s_storage_class_name: A Kubernetes StorageClass name for persistent volumes. Selected StorageClass must support RWX PersistentVolumes.
"""
Expand Down Expand Up @@ -180,13 +200,17 @@ def pipeline(
repo_branch=sdg_repo_branch,
repo_pr=sdg_repo_pr,
sdg_sampling_size=sdg_sample_size,
use_tls=sdg_use_tls,
)
sdg_task.set_env_variable("HOME", "/tmp")
sdg_task.set_env_variable("HF_HOME", "/tmp")
use_config_map_as_env(
sdg_task, TEACHER_CONFIG_MAP, dict(endpoint="endpoint", model="model")
)
use_secret_as_env(sdg_task, TEACHER_SECRET, {"api_key": "api_key"})
use_config_map_as_volume(sdg_task, TEACHER_CONFIG_MAP, mount_path=SDG_CA_CERT_PATH)
sdg_task.set_env_variable(SDG_CA_CERT_ENV_VAR_NAME, os.path.join(SDG_CA_CERT_PATH, SDG_CA_CERT_CM_KEY))

sdg_task.after(git_clone_task)
mount_pvc(
task=sdg_task,
Expand Down Expand Up @@ -330,6 +354,7 @@ def pipeline(
models_folder="/output/phase_2/model/hf_format",
max_workers=mt_bench_max_workers,
merge_system_user_message=mt_bench_merge_system_user_message,
use_tls=mt_bench_use_tls,
)
mount_pvc(
task=run_mt_bench_task,
Expand All @@ -349,6 +374,9 @@ def pipeline(
)
use_secret_as_env(run_mt_bench_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

use_config_map_as_volume(run_mt_bench_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH)
run_mt_bench_task.set_env_variable(JUDGE_CA_CERT_ENV_VAR_NAME, os.path.join(JUDGE_CA_CERT_PATH, JUDGE_CA_CERT_CM_KEY))

# uncomment if updating image with same tag
# set_image_pull_policy(run_mt_bench_task, "Always")

Expand All @@ -362,6 +390,7 @@ def pipeline(
merge_system_user_message=final_eval_merge_system_user_message,
few_shots=final_eval_few_shots,
batch_size=final_eval_batch_size,
use_tls=final_eval_use_tls,
)
mount_pvc(
task=final_eval_task, pvc_name=output_pvc_task.output, mount_path="/output"
Expand Down Expand Up @@ -391,6 +420,9 @@ def pipeline(

use_secret_as_env(final_eval_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

use_config_map_as_volume(final_eval_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH)
final_eval_task.set_env_variable(JUDGE_CA_CERT_ENV_VAR_NAME, os.path.join(JUDGE_CA_CERT_PATH, JUDGE_CA_CERT_CM_KEY))

final_eval_task.after(run_mt_bench_task)
final_eval_task.set_accelerator_type("nvidia.com/gpu")
final_eval_task.set_accelerator_limit(1)
Expand Down Expand Up @@ -592,10 +624,10 @@ def gen_standalone():
# The list of executor names to extract details from to generate the standalone script
executors = {
"exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg_path="{DATA_PVC_SDG_PATH}", model_path="{DATA_PVC_MODEL_PATH}", skills_path="{PREPROCESSED_DATA_SKILLS_PATH}", knowledge_path="{PREPROCESSED_DATA_KNOWLEDGE_PATH}")',
"exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, pipeline="{sdg_pipeline}", repo_branch="{exec_git_clone_op_repo_branch or ""}", repo_pr={exec_git_clone_op_repo_pr or 0}, taxonomy_path="{TAXONOMY_DATA_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", sdg_sampling_size={sdg_sampling_size})',
"exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, pipeline="{sdg_pipeline}", repo_branch="{exec_git_clone_op_repo_branch or ""}", repo_pr={exec_git_clone_op_repo_pr or 0}, taxonomy_path="{TAXONOMY_DATA_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", sdg_sampling_size={sdg_sampling_size}, use_tls={sdg_use_tls})',
"exec-git-clone-op": {},
"exec-run-mt-bench-op": 'run_mt_bench_op(best_score_file="{MT_BENCH_SCORES_PATH}",output_path="{MT_BENCH_OUTPUT_PATH}",models_folder="{CANDIDATE_MODEL_PATH_PREFIX}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE})',
"exec-run-final-eval-op": 'run_final_eval_op(mmlu_branch_output="{MMLU_BRANCH_SCORES_PATH}", mt_bench_branch_output="{MT_BENCH_BRANCH_SCORES_PATH}", candidate_model="{CANDIDATE_MODEL_PATH}", taxonomy_path="{TAXONOMY_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", base_branch="", candidate_branch="", base_model_dir="{DATA_PVC_MODEL_PATH}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, few_shots={FEW_SHOTS}, batch_size="{BATCH_SIZE}")',
"exec-run-mt-bench-op": 'run_mt_bench_op(best_score_file="{MT_BENCH_SCORES_PATH}",output_path="{MT_BENCH_OUTPUT_PATH}",models_folder="{CANDIDATE_MODEL_PATH_PREFIX}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, use_tls={mt_bench_use_tls})',
"exec-run-final-eval-op": 'run_final_eval_op(mmlu_branch_output="{MMLU_BRANCH_SCORES_PATH}", mt_bench_branch_output="{MT_BENCH_BRANCH_SCORES_PATH}", candidate_model="{CANDIDATE_MODEL_PATH}", taxonomy_path="{TAXONOMY_PATH}", sdg_path="{DATA_PVC_SDG_PATH}", base_branch="", candidate_branch="", base_model_dir="{DATA_PVC_MODEL_PATH}", max_workers="{MAX_WORKERS}", merge_system_user_message={MERGE_SYSTEM_USER_MESSAGE}, few_shots={FEW_SHOTS}, batch_size="{BATCH_SIZE}", use_tls={final_eval_use_tls})',
}

details = {}
Expand Down
Loading

0 comments on commit 5be3f90

Please sign in to comment.