diff --git a/eval/final/__init__.py b/eval/final/__init__.py index 62053fe..8edaf62 100644 --- a/eval/final/__init__.py +++ b/eval/final/__init__.py @@ -1,5 +1,5 @@ -from .components import run_final_eval_op +from .components import generate_metrics_report_op, run_final_eval_op # from . import faked -__all__ = ["run_final_eval_op"] +__all__ = ["run_final_eval_op", "generate_metrics_report_op"] diff --git a/eval/final/components.py b/eval/final/components.py index 469d222..99f5a9a 100644 --- a/eval/final/components.py +++ b/eval/final/components.py @@ -1,15 +1,13 @@ # type: ignore # pylint: disable=no-value-for-parameter,import-outside-toplevel,import-error -from kfp.dsl import Artifact, Output, component +from kfp.dsl import Artifact, Input, Metrics, Output, component -from utils.consts import RHELAI_IMAGE +from utils.consts import PYTHON_IMAGE, RHELAI_IMAGE @component(base_image=RHELAI_IMAGE, install_kfp_package=False) def run_final_eval_op( - mmlu_branch_output: Output[Artifact], - mt_bench_branch_output: Output[Artifact], base_model_dir: str, base_branch: str, candidate_branch: str, @@ -20,6 +18,8 @@ def run_final_eval_op( candidate_model: str = None, taxonomy_path: str = "/input/taxonomy", sdg_path: str = "/input/sdg", + mmlu_branch_output_path: str = "/output/mmlu_branch", + mt_bench_branch_output_path: str = "/output/mt_bench_branch", ): import json import os @@ -326,8 +326,13 @@ def find_node_dataset_directories(base_dir: str): "summary": summary, } - with open(mmlu_branch_output.path, "w", encoding="utf-8") as f: + if not os.path.exists(mmlu_branch_output_path): + os.makedirs(mmlu_branch_output_path) + with open( + f"{mmlu_branch_output_path}/mmlu_branch_data.json", "w", encoding="utf-8" + ) as f: json.dump(mmlu_branch_data, f, indent=4) + else: print("No MMLU tasks directories found, skipping MMLU_branch evaluation.") @@ -470,5 +475,41 @@ def find_node_dataset_directories(base_dir: str): "summary": summary, } - with open(mt_bench_branch_output.path, "w", encoding="utf-8") as f: + if not os.path.exists(mt_bench_branch_output_path): + os.makedirs(mt_bench_branch_output_path) + with open( + f"{mt_bench_branch_output_path}/mt_bench_branch_data.json", + "w", + encoding="utf-8", + ) as f: json.dump(mt_bench_branch_data, f, indent=4) + + +@component(base_image=PYTHON_IMAGE, install_kfp_package=False) +def generate_metrics_report_op( + metrics: Output[Metrics], +): + import ast + import json + + with open("/output/mt_bench_data.json", "r") as f: + mt_bench_data = f.read() + mt_bench_data = ast.literal_eval(mt_bench_data)[0] + + metrics.log_metric("mt_bench_best_model", mt_bench_data["model"]) + metrics.log_metric("mt_bench_best_score", mt_bench_data["overall_score"]) + metrics.log_metric("mt_bench_best_model_error_rate", mt_bench_data["error_rate"]) + + with open("/output/mt_bench_branch/mt_bench_branch_data.json", "r") as f: + mt_bench_branch_data = json.loads(f.read()) + + metrics.log_metric("mt_bench_branch_score", mt_bench_branch_data["overall_score"]) + metrics.log_metric( + "mt_bench_branch_base_score", mt_bench_branch_data["base_overall_score"] + ) + + with open("/output/mmlu_branch/mmlu_branch_data.json", "r") as f: + mmlu_branch_data = json.loads(f.read()) + + metrics.log_metric("mmlu_branch_score", mmlu_branch_data["model_score"]) + metrics.log_metric("mmlu_branch_base_score", mmlu_branch_data["base_model_score"]) diff --git a/pipeline.py b/pipeline.py index 00d3df2..2a6643e 100644 --- a/pipeline.py +++ b/pipeline.py @@ -83,7 +83,7 @@ def ilab_pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]): ) # Imports for evaluation - from eval.final import run_final_eval_op + from eval.final import generate_metrics_report_op, run_final_eval_op from eval.mt_bench import run_mt_bench_op @dsl.pipeline( @@ -452,17 +452,27 @@ def pipeline( mount_path="/output", ) - output_pvc_delete_task = DeletePVC(pvc_name=output_pvc_task.output) - output_pvc_delete_task.after( - output_model_task, output_mt_bench_task, final_eval_task - ) - sdg_pvc_delete_task = DeletePVC(pvc_name=sdg_input_pvc_task.output) sdg_pvc_delete_task.after(final_eval_task) model_pvc_delete_task = DeletePVC(pvc_name=model_pvc_task.output) model_pvc_delete_task.after(final_eval_task) + generate_metrics_report_task = generate_metrics_report_op() + generate_metrics_report_task.after(output_mt_bench_task, final_eval_task) + generate_metrics_report_task.set_caching_options(False) + mount_pvc( + task=generate_metrics_report_task, + pvc_name=output_pvc_task.output, + mount_path="/output", + ) + + output_pvc_delete_task = DeletePVC(pvc_name=output_pvc_task.output) + output_pvc_delete_task.after( + output_model_task, + generate_metrics_report_task, + ) + return return pipeline diff --git a/pipeline.yaml b/pipeline.yaml index ef5d586..29e70b6 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -30,6 +30,8 @@ # train_num_warmup_steps_phase_2: int [Default: 1000.0] # train_save_samples: int [Default: 250000.0] # train_seed: int [Default: 42.0] +# Outputs: +# generate-metrics-report-op-metrics: system.Metrics components: comp-createpvc: executorLabel: exec-createpvc @@ -266,6 +268,14 @@ components: description: Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. parameterType: STRING + comp-generate-metrics-report-op: + executorLabel: exec-generate-metrics-report-op + outputDefinitions: + artifacts: + metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 comp-git-clone-op: executorLabel: exec-git-clone-op inputDefinitions: @@ -464,6 +474,14 @@ components: parameterType: STRING merge_system_user_message: parameterType: BOOLEAN + mmlu_branch_output_path: + defaultValue: /output/mmlu_branch + isOptional: true + parameterType: STRING + mt_bench_branch_output_path: + defaultValue: /output/mt_bench_branch + isOptional: true + parameterType: STRING sdg_path: defaultValue: /input/sdg isOptional: true @@ -472,16 +490,6 @@ components: defaultValue: /input/taxonomy isOptional: true parameterType: STRING - outputDefinitions: - artifacts: - mmlu_branch_output: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - mt_bench_branch_output: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-run-mt-bench-op: executorLabel: exec-run-mt-bench-op inputDefinitions: @@ -658,6 +666,41 @@ deploymentSpec: exec-deletepvc-3: container: image: argostub/deletepvc + exec-generate-metrics-report-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - generate_metrics_report_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef generate_metrics_report_op(\n metrics: Output[Metrics],\n\ + ):\n import ast\n import json\n\n with open(\"/output/mt_bench_data.json\"\ + , \"r\") as f:\n mt_bench_data = f.read()\n mt_bench_data = ast.literal_eval(mt_bench_data)[0]\n\ + \n metrics.log_metric(\"mt_bench_best_model\", mt_bench_data[\"model\"\ + ])\n metrics.log_metric(\"mt_bench_best_score\", mt_bench_data[\"overall_score\"\ + ])\n metrics.log_metric(\"mt_bench_best_model_error_rate\", mt_bench_data[\"\ + error_rate\"])\n\n with open(\"/output/mt_bench_branch/mt_bench_branch_data.json\"\ + , \"r\") as f:\n mt_bench_branch_data = json.loads(f.read())\n\n\ + \ metrics.log_metric(\"mt_bench_branch_score\", mt_bench_branch_data[\"\ + overall_score\"])\n metrics.log_metric(\n \"mt_bench_branch_base_score\"\ + , mt_bench_branch_data[\"base_overall_score\"]\n )\n\n with open(\"\ + /output/mmlu_branch/mmlu_branch_data.json\", \"r\") as f:\n mmlu_branch_data\ + \ = json.loads(f.read())\n\n metrics.log_metric(\"mmlu_branch_score\"\ + , mmlu_branch_data[\"model_score\"])\n metrics.log_metric(\"mmlu_branch_base_score\"\ + , mmlu_branch_data[\"base_model_score\"])\n\n" + image: quay.io/modh/odh-generic-data-science-notebook:v3-2024b-20241111 exec-git-clone-op: container: args: @@ -1138,37 +1181,38 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef run_final_eval_op(\n mmlu_branch_output: Output[Artifact],\n\ - \ mt_bench_branch_output: Output[Artifact],\n base_model_dir: str,\n\ - \ base_branch: str,\n candidate_branch: str,\n max_workers: str,\n\ - \ few_shots: int,\n batch_size: str,\n merge_system_user_message:\ - \ bool,\n candidate_model: str = None,\n taxonomy_path: str = \"/input/taxonomy\"\ - ,\n sdg_path: str = \"/input/sdg\",\n):\n import json\n import\ - \ os\n import subprocess\n\n import httpx\n import torch\n from\ - \ instructlab.eval.mmlu import MMLUBranchEvaluator\n from instructlab.eval.mt_bench\ - \ import MTBenchBranchEvaluator\n from instructlab.model.evaluate import\ - \ qa_pairs_to_qna_to_avg_scores, sort_score\n\n judge_api_key = os.getenv(\"\ - JUDGE_API_KEY\", \"\")\n judge_model_name = os.getenv(\"JUDGE_NAME\"\ - )\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\")\n judge_ca_cert_path\ - \ = os.getenv(\"JUDGE_CA_CERT_PATH\")\n use_tls = os.path.exists(judge_ca_cert_path)\ - \ and (\n os.path.getsize(judge_ca_cert_path) > 0\n )\n judge_http_client\ - \ = httpx.Client(verify=judge_ca_cert_path) if use_tls else None\n\n \ - \ print(\"Starting Final Eval...\")\n\n def launch_vllm(\n model_path:\ - \ str, gpu_count: int, retries: int = 120, delay: int = 10\n ) -> tuple:\n\ - \ import subprocess\n import sys\n import time\n\n\ - \ import requests\n from instructlab.model.backends.common\ - \ import free_tcp_ipv4_port\n\n free_port = free_tcp_ipv4_port(\"\ - 127.0.0.1\")\n port = str(free_port)\n vllm_server = f\"http://127.0.0.1:{port}/v1\"\ - \n\n command = [\n sys.executable,\n \"-m\"\ - ,\n \"vllm.entrypoints.openai.api_server\",\n \"--port\"\ - ,\n port,\n \"--model\",\n model_path,\n\ - \ ]\n if gpu_count > 0:\n command += [\n \ - \ \"--tensor-parallel-size\",\n str(gpu_count),\n\ - \ ]\n\n process = subprocess.Popen(args=command)\n\n \ - \ print(f\"Waiting for vLLM server to start at {vllm_server}...\"\ - )\n\n for attempt in range(retries):\n try:\n \ - \ response = requests.get(f\"{vllm_server}/models\", timeout=10)\n\ - \ if response.status_code == 200:\n print(f\"\ + \ *\n\ndef run_final_eval_op(\n base_model_dir: str,\n base_branch:\ + \ str,\n candidate_branch: str,\n max_workers: str,\n few_shots:\ + \ int,\n batch_size: str,\n merge_system_user_message: bool,\n \ + \ candidate_model: str = None,\n taxonomy_path: str = \"/input/taxonomy\"\ + ,\n sdg_path: str = \"/input/sdg\",\n mmlu_branch_output_path: str\ + \ = \"/output/mmlu_branch\",\n mt_bench_branch_output_path: str = \"\ + /output/mt_bench_branch\",\n):\n import json\n import os\n import\ + \ subprocess\n\n import httpx\n import torch\n from instructlab.eval.mmlu\ + \ import MMLUBranchEvaluator\n from instructlab.eval.mt_bench import\ + \ MTBenchBranchEvaluator\n from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores,\ + \ sort_score\n\n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n\ + \ judge_model_name = os.getenv(\"JUDGE_NAME\")\n judge_endpoint =\ + \ os.getenv(\"JUDGE_ENDPOINT\")\n judge_ca_cert_path = os.getenv(\"JUDGE_CA_CERT_PATH\"\ + )\n use_tls = os.path.exists(judge_ca_cert_path) and (\n os.path.getsize(judge_ca_cert_path)\ + \ > 0\n )\n judge_http_client = httpx.Client(verify=judge_ca_cert_path)\ + \ if use_tls else None\n\n print(\"Starting Final Eval...\")\n\n def\ + \ launch_vllm(\n model_path: str, gpu_count: int, retries: int =\ + \ 120, delay: int = 10\n ) -> tuple:\n import subprocess\n \ + \ import sys\n import time\n\n import requests\n \ + \ from instructlab.model.backends.common import free_tcp_ipv4_port\n\n\ + \ free_port = free_tcp_ipv4_port(\"127.0.0.1\")\n port = str(free_port)\n\ + \ vllm_server = f\"http://127.0.0.1:{port}/v1\"\n\n command\ + \ = [\n sys.executable,\n \"-m\",\n \"\ + vllm.entrypoints.openai.api_server\",\n \"--port\",\n \ + \ port,\n \"--model\",\n model_path,\n \ + \ ]\n if gpu_count > 0:\n command += [\n \ + \ \"--tensor-parallel-size\",\n str(gpu_count),\n \ + \ ]\n\n process = subprocess.Popen(args=command)\n\n \ + \ print(f\"Waiting for vLLM server to start at {vllm_server}...\")\n\n\ + \ for attempt in range(retries):\n try:\n \ + \ response = requests.get(f\"{vllm_server}/models\", timeout=10)\n \ + \ if response.status_code == 200:\n print(f\"\ vLLM server is up and running at {vllm_server}.\")\n \ \ return process, vllm_server\n except requests.ConnectionError:\n\ \ pass\n\n print(\n f\"Server not\ @@ -1294,9 +1338,11 @@ deploymentSpec: : candidate_model,\n \"model_score\": round(overall_score, 2),\n\ \ \"base_model\": base_model_dir,\n \"base_model_score\"\ : round(base_overall_score, 2),\n \"summary\": summary,\n \ - \ }\n\n with open(mmlu_branch_output.path, \"w\", encoding=\"\ - utf-8\") as f:\n json.dump(mmlu_branch_data, f, indent=4)\n \ - \ else:\n print(\"No MMLU tasks directories found, skipping MMLU_branch\ + \ }\n\n if not os.path.exists(mmlu_branch_output_path):\n \ + \ os.makedirs(mmlu_branch_output_path)\n with open(\n \ + \ f\"{mmlu_branch_output_path}/mmlu_branch_data.json\", \"w\", encoding=\"\ + utf-8\"\n ) as f:\n json.dump(mmlu_branch_data, f, indent=4)\n\ + \n else:\n print(\"No MMLU tasks directories found, skipping MMLU_branch\ \ evaluation.\")\n\n # MT_BENCH_BRANCH\n\n print(\"Starting MT_BENCH_BRANCH\ \ ...\")\n\n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n \ \ judge_model_name = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"\ @@ -1362,8 +1408,11 @@ deploymentSpec: \ \"judge_model\": judge_model_name,\n \"max_score\": \"10.0\"\ ,\n \"overall_score\": overall_score,\n \"base_overall_score\"\ : base_overall_score,\n \"error_rate\": error_rate,\n \"summary\"\ - : summary,\n }\n\n with open(mt_bench_branch_output.path, \"w\", encoding=\"\ - utf-8\") as f:\n json.dump(mt_bench_branch_data, f, indent=4)\n\n" + : summary,\n }\n\n if not os.path.exists(mt_bench_branch_output_path):\n\ + \ os.makedirs(mt_bench_branch_output_path)\n with open(\n \ + \ f\"{mt_bench_branch_output_path}/mt_bench_branch_data.json\",\n \ + \ \"w\",\n encoding=\"utf-8\",\n ) as f:\n json.dump(mt_bench_branch_data,\ + \ f, indent=4)\n\n" env: - name: HOME value: /tmp @@ -1646,6 +1695,12 @@ pipelineInfo: name: instructlab root: dag: + outputs: + artifacts: + generate-metrics-report-op-metrics: + artifactSelectors: + - outputArtifactKey: metrics + producerSubtask: generate-metrics-report-op tasks: createpvc: cachingOptions: @@ -1731,16 +1786,14 @@ root: componentRef: name: comp-deletepvc dependentTasks: - - createpvc-3 - - pvc-to-model-op - - pvc-to-mt-bench-op + - createpvc - run-final-eval-op inputs: parameters: pvc_name: taskOutputParameter: outputParameterKey: name - producerTask: createpvc-3 + producerTask: createpvc taskInfo: name: deletepvc deletepvc-2: @@ -1749,14 +1802,14 @@ root: componentRef: name: comp-deletepvc-2 dependentTasks: - - createpvc + - createpvc-2 - run-final-eval-op inputs: parameters: pvc_name: taskOutputParameter: outputParameterKey: name - producerTask: createpvc + producerTask: createpvc-2 taskInfo: name: deletepvc-2 deletepvc-3: @@ -1765,16 +1818,27 @@ root: componentRef: name: comp-deletepvc-3 dependentTasks: - - createpvc-2 - - run-final-eval-op + - createpvc-3 + - generate-metrics-report-op + - pvc-to-model-op inputs: parameters: pvc_name: taskOutputParameter: outputParameterKey: name - producerTask: createpvc-2 + producerTask: createpvc-3 taskInfo: name: deletepvc-3 + generate-metrics-report-op: + cachingOptions: {} + componentRef: + name: comp-generate-metrics-report-op + dependentTasks: + - createpvc-3 + - pvc-to-mt-bench-op + - run-final-eval-op + taskInfo: + name: generate-metrics-report-op git-clone-op: cachingOptions: {} componentRef: @@ -2227,6 +2291,12 @@ root: description: Training parameter. Random seed for initializing training. isOptional: true parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + generate-metrics-report-op-metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 schemaVersion: 2.1.0 sdkVersion: kfp-2.9.0 --- @@ -2244,6 +2314,12 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc + exec-generate-metrics-report-op: + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 exec-git-clone-op: pvcMount: - mountPath: /data