From e50c890e1ff644f48ee4f4651e7579b01fa488ad Mon Sep 17 00:00:00 2001 From: binbin Deng <108676127+plusbang@users.noreply.github.com> Date: Tue, 18 Jun 2024 09:55:40 +0800 Subject: [PATCH] Support finishing PP inference once `eos_token_id` is found (#11336) --- .../GPU/Pipeline-Parallel-Inference/README.md | 3 +- .../Pipeline-Parallel-Inference/generate.py | 1 + .../run_qwen1.5_arc_2_card.sh | 4 ++ .../transformers/pipeline_parallel.py | 59 ++++++++++++++++++- 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/python/llm/example/GPU/Pipeline-Parallel-Inference/README.md b/python/llm/example/GPU/Pipeline-Parallel-Inference/README.md index bbe6f543291..a2efa1df992 100644 --- a/python/llm/example/GPU/Pipeline-Parallel-Inference/README.md +++ b/python/llm/example/GPU/Pipeline-Parallel-Inference/README.md @@ -11,6 +11,7 @@ To run this example with IPEX-LLM on Intel GPUs, we have some recommended requir - [meta-llama/Meta-Llama-3-8B-Instruct](./run_llama_arc_2_card.sh) - [Qwen/Qwen1.5-7B-Chat](./run_qwen1.5_arc_2_card.sh) - [Qwen/Qwen1.5-14B-Chat](./run_qwen1.5_arc_2_card.sh) +- [Qwen/Qwen1.5-32B-Chat](./run_qwen1.5_arc_2_card.sh) - [baichuan-inc/Baichuan2-7B-Chat](./run_baichuan2_arc_2_card.sh) - [baichuan-inc/Baichuan2-13B-Chat](./run_baichuan2_arc_2_card.sh) - [microsoft/Phi-3-mini-4k-instruct](./run_phi3_arc_2_card.sh) @@ -57,7 +58,7 @@ bash run_llama_arc_2_card.sh
Show Qwen1.5 example -#### Run Qwen1.5-7B-Chat / Qwen1.5-14B-Chat on two Intel Arc A770 +#### Run Qwen1.5-7B-Chat / Qwen1.5-14B-Chat / Qwen1.5-32B-Chat on two Intel Arc A770 You could specify `--repo-id-or-model-path` in the test script to be the huggingface repo id for Qwen1.5 to be downloaded, or the path to the huggingface checkpoint folder. Besides, you could change `NUM_GPUS` to the number of GPUs you have on your machine. diff --git a/python/llm/example/GPU/Pipeline-Parallel-Inference/generate.py b/python/llm/example/GPU/Pipeline-Parallel-Inference/generate.py index 5104c7010f0..1be06e7072d 100644 --- a/python/llm/example/GPU/Pipeline-Parallel-Inference/generate.py +++ b/python/llm/example/GPU/Pipeline-Parallel-Inference/generate.py @@ -46,6 +46,7 @@ optimize_model=True, trust_remote_code=True, use_cache=True, + torch_dtype=torch.float16, pipeline_parallel_stages=args.gpu_num) # Load tokenizer diff --git a/python/llm/example/GPU/Pipeline-Parallel-Inference/run_qwen1.5_arc_2_card.sh b/python/llm/example/GPU/Pipeline-Parallel-Inference/run_qwen1.5_arc_2_card.sh index f3b49bbffc1..fff43aec4a2 100644 --- a/python/llm/example/GPU/Pipeline-Parallel-Inference/run_qwen1.5_arc_2_card.sh +++ b/python/llm/example/GPU/Pipeline-Parallel-Inference/run_qwen1.5_arc_2_card.sh @@ -34,3 +34,7 @@ CCL_ZE_IPC_EXCHANGE=sockets torchrun --standalone --nnodes=1 --nproc-per-node $N # # To run Qwen1.5-14B-Chat # CCL_ZE_IPC_EXCHANGE=sockets torchrun --standalone --nnodes=1 --nproc-per-node $NUM_GPUS \ # generate.py --repo-id-or-model-path 'Qwen/Qwen1.5-14B-Chat' --gpu-num $NUM_GPUS + +# # To run Qwen1.5-32B-Chat +# CCL_ZE_IPC_EXCHANGE=sockets torchrun --standalone --nnodes=1 --nproc-per-node $NUM_GPUS \ +# generate.py --repo-id-or-model-path 'Qwen/Qwen1.5-32B-Chat' --gpu-num $NUM_GPUS diff --git a/python/llm/src/ipex_llm/transformers/pipeline_parallel.py b/python/llm/src/ipex_llm/transformers/pipeline_parallel.py index a031d001fe4..92cb7d726a3 100644 --- a/python/llm/src/ipex_llm/transformers/pipeline_parallel.py +++ b/python/llm/src/ipex_llm/transformers/pipeline_parallel.py @@ -25,6 +25,9 @@ import numpy as np from typing import Callable, List, Optional from transformers import GenerationConfig, LogitsProcessorList, StoppingCriteriaList +from ipex_llm.utils.common import invalidInputError +import logging +logger = logging.getLogger(__name__) # patch GenerationMixin.generate from transformers import GenerationMixin @@ -118,12 +121,34 @@ def generate( **kwargs, ): if hasattr(self, 'pipeline_parallel_stages') and self.pipeline_parallel_stages > 1: + # priority: `generation_config` argument > `model.generation_config` + if generation_config is None: + if ( + self.generation_config._from_model_config + and self.generation_config._original_object_hash == hash(self.generation_config) + and self.config._has_non_default_generation_parameters() + ): + new_generation_config = GenerationConfig.from_model_config(self.config) + if new_generation_config != self.generation_config: + self.generation_config = new_generation_config + generation_config = self.generation_config + + if generation_config.pad_token_id is None and generation_config.eos_token_id is not None: + eos_token_id = generation_config.eos_token_id + if isinstance(eos_token_id, list): + eos_token_id = eos_token_id[0] + logger.warning("Setting `pad_token_id` to `eos_token_id`: " + f"{eos_token_id} for open-end generation.") + generation_config.pad_token_id = eos_token_id + if generation_config is not None and generation_config.max_new_tokens is not None: max_new_tokens = generation_config.max_new_tokens else: max_new_tokens = kwargs.get("max_new_tokens", None) + return self.pipeline_parallel_generate(inputs=inputs, - max_new_tokens=max_new_tokens,) + max_new_tokens=max_new_tokens, + generation_config=generation_config,) return original_generate(self, inputs=inputs, @@ -143,6 +168,7 @@ def generate( def pipeline_parallel_generate(self, inputs: Optional[torch.Tensor] = None, max_new_tokens: int = 32, + generation_config: Optional[GenerationConfig] = None, **kwargs): local_rank = dist.get_rank() pre_rank = (local_rank - 1) % self.pipeline_parallel_stages @@ -154,12 +180,22 @@ def pipeline_parallel_generate(self, self.first_token_time = 0 self.next_token_time = [] + pad_token_id = generation_config.pad_token_id + eos_token_id = generation_config.eos_token_id + if isinstance(eos_token_id, int): + eos_token_id = [eos_token_id] + eos_token_id_tensor = torch.tensor(eos_token_id).to(inputs.device) \ + if eos_token_id is not None else None + _input_ids = None _past_key_values = None bs = inputs.shape[0] output_ids = inputs.clone() step = 0 + # keep track of which sequences are already finished + unfinished_sequences = torch.ones(inputs.shape[0], dtype=torch.long, device=inputs.device) + this_peer_finished = False while True: if step >= max_new_tokens: break @@ -190,6 +226,14 @@ def pipeline_parallel_generate(self, _input_ids = next_ids output_ids = torch.cat([output_ids, next_ids], dim=-1) + # finished sentences should have their next token be a padding token + next_ids = next_ids.squeeze() + if eos_token_id is not None: + if pad_token_id is None: + invalidInputError(False, "If `eos_token_id` is defined, " + "make sure that `pad_token_id` is defined.") + next_ids = next_ids * unfinished_sequences + pad_token_id * (1 - unfinished_sequences) + if isinstance(outputs.past_key_values, tuple) and local_rank != 0: value_placeholder = torch.empty_like((outputs.past_key_values)[-1][0]) past_key_values_placeholder = tuple( @@ -204,6 +248,19 @@ def pipeline_parallel_generate(self, self.first_token_time = toc - tic else: self.next_token_time.append(toc - tic) + + # if eos_token was found in one sentence, set sentence to finished + if eos_token_id_tensor is not None: + unfinished_sequences = unfinished_sequences.mul( + next_ids.tile(eos_token_id_tensor.shape[0], 1) + .ne(eos_token_id_tensor.unsqueeze(1)).prod(dim=0) + ) + # stop when each sentence is finished + if unfinished_sequences.max() == 0: + this_peer_finished = True + if this_peer_finished: + break + step += 1 if self.device.type == 'xpu': torch.xpu.synchronize()