Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LLM: Refactor Pipeline-Parallel-FastAPI example #11319

Merged
merged 27 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/llm/example/GPU/Pipeline-Parallel-FastAPI/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip install mpi4py fastapi uvicorn openai
pip install gradio # for gradio web UI
conda install -c conda-forge -y gperftools=2.10 # to enable tcmalloc

pip install transformers==4.31.0 # for llama2 models
pip install transformers==4.37.0
```

### 2. Run pipeline parallel serving on multiple GPUs
Expand Down
112 changes: 52 additions & 60 deletions python/llm/example/GPU/Pipeline-Parallel-FastAPI/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,36 @@ def perform_request(session, url, payload, headers):
start_time = time.perf_counter()
with session.post(url, json=payload, headers=headers, stream=True) as response:
response.raise_for_status()

first_token_time = None
last_token_time = 0
first_token_inference_time = None
next_token_inference_time = None
next_token_time = []
i = 0
for line in response.iter_lines():

token_time = time.perf_counter() - start_time
if line:
data = line.decode("utf-8").strip()
i = i + 1
try:
json_data = json.loads(data)
if json_data["message"] is not None:
if first_token_time is None:
first_token_time = token_time
else:
next_token_time.append(token_time - last_token_time)
last_token_time = token_time
except json.JSONDecodeError:
pass
data = line.decode('utf-8').strip()
if data.startswith('data: '):
data = data[len('data: '):]
i = i + 1
try:
json_data = json.loads(data)
if 'choices' in json_data and len(json_data['choices']) > 0:
choice = json_data['choices'][0]
if 'finish_reason' in choice and (choice['finish_reason'] == 'length' or choice['finish_reason'] == 'stop'):
if 'first_token_time' in choice and isinstance(choice['first_token_time'], float):
first_token_inference_time = choice['first_token_time']
if 'rest_token_time' in choice and isinstance(choice['rest_token_time'], float):
next_token_inference_time = choice['rest_token_time']
else:
if first_token_time is None:
first_token_time = token_time
else:
next_token_time.append(token_time - last_token_time)
last_token_time = token_time
except json.JSONDecodeError:
pass
end_time = time.perf_counter()
return (
first_token_time,
Expand All @@ -76,11 +83,11 @@ def extend_list_to_length(lst, target_length):
def benchmark(
llm_urls,
prompt,
num_warmup_requests,
num_requests,
max_concurrent_requests,
max_tokens,
prompt_length,
is_warmup=False,
):

headers = {"Content-Type": "application/json"}
Expand All @@ -92,6 +99,8 @@ def benchmark(
next_token_inference_times = []
cur_url_index = 0

num_requests = num_requests + num_warmup_requests

with requests.Session() as session:
with ThreadPoolExecutor(max_workers=max_concurrent_requests) as executor:
llm_url = llm_urls[cur_url_index]
Expand All @@ -101,8 +110,17 @@ def benchmark(
cur_len = len(cur_llm_urls)

payload = {
"model": "Meta-Llama-3-8B-Instruct",
"prompt": prompt,
"n_predict": max_tokens,
"max_tokens": max_tokens,
"stream": True,
# for vllm openai api server
"ignore_eos": True,
"n": 1,
"best_of": 1,
"use_beam_search": False,
"temperature": 0.0,
"top_p": 1.0,
}
futures = [
executor.submit(
Expand All @@ -115,14 +133,13 @@ def benchmark(
for index in range(num_requests)
]

start_time = time.perf_counter()
phase = "Benchmarking"

if is_warmup:
phase = "Warm Up"
else:
phase = "Benchmarking"
with tqdm(total=num_requests, desc=phase, unit="req", ncols=100) as pbar:
cur_index = 0
for future in concurrent.futures.as_completed(futures):
if cur_index == num_warmup_requests:
start_time = time.perf_counter()
try:
(
first_token_latency,
Expand All @@ -131,21 +148,21 @@ def benchmark(
first_token_inference_time,
next_token_inference_time,
) = future.result()
first_token_latencies.append(first_token_latency)
next_token_latencies.append(next_token_latency)
total_responce_times.append(total_responce_time)
if first_token_inference_time:
first_token_inference_times.append(
first_token_inference_time
)
if next_token_inference_time:
next_token_inference_times.append(next_token_inference_time)
cur_index = cur_index + 1
if cur_index > num_warmup_requests:
first_token_latencies.append(first_token_latency)
next_token_latencies.append(next_token_latency)
total_responce_times.append(total_responce_time)
if first_token_inference_time:
first_token_inference_times.append(
first_token_inference_time
)
if next_token_inference_time:
next_token_inference_times.append(next_token_inference_time)
except Exception as e:
print(f"Request failed: {e}")
pbar.update(1)

if is_warmup:
return
total_time = time.perf_counter() - start_time
log_file = f"{max_concurrent_requests}.log"

Expand Down Expand Up @@ -174,9 +191,6 @@ def benchmark(
)
p90_first_token_latency = np.percentile(first_token_latencies, 90)
p95_first_token_latency = np.percentile(first_token_latencies, 95)
# average_first_token_inference_latency = np.mean(
# first_token_inference_times
# )
print(
f"Average first token latency: {average_first_token_latency * 1000} milliseconds.",
file=file,
Expand All @@ -189,10 +203,6 @@ def benchmark(
f"P95 first token latency: {p95_first_token_latency * 1000} milliseconds.",
file=file,
)
# print(
# f"Average first token inference latency: {average_first_token_inference_latency * 1000} milliseconds.",
# file=file,
# )
print(file=file)

if next_token_latencies:
Expand All @@ -201,9 +211,6 @@ def benchmark(
)
p90_next_token_latency = np.percentile(next_token_latencies, 90)
p95_next_token_latency = np.percentile(next_token_latencies, 95)
# average_next_token_inference_latency = np.mean(
# next_token_inference_times
# )
print(
f"Average next token latency: {average_next_token_latency * 1000} milliseconds.",
file=file,
Expand All @@ -216,14 +223,10 @@ def benchmark(
f"P95 next token latency: {p95_next_token_latency * 1000} milliseconds.",
file=file,
)
# print(
# f"Average next token inference latency: {average_next_token_inference_latency * 1000} milliseconds.",
# file=file,
# )
print(file=file)


LLM_URLS = [f"http://localhost:{PORT}/generate_stream/" for PORT in [8000]]
LLM_URLS = [f"http://localhost:{PORT}/v1/completions" for PORT in [8000]]

parser = argparse.ArgumentParser(description="Set prompt length.")
parser.add_argument(
Expand Down Expand Up @@ -254,17 +257,6 @@ def benchmark(

for MAX_CONCURRENT_REQUESTS in args.max_concurrent_requests:
NUM_WARMUP = 5 * MAX_CONCURRENT_REQUESTS
NUM_REQUESTS = 10 * MAX_CONCURRENT_REQUESTS

# warm up
benchmark(
LLM_URLS,
PROMPT,
NUM_WARMUP,
MAX_CONCURRENT_REQUESTS,
MAX_TOKENS,
PROMPT_LENGTH,
is_warmup=True,
)
NUM_REQUESTS = 30 * MAX_CONCURRENT_REQUESTS

benchmark(LLM_URLS, PROMPT, NUM_REQUESTS, MAX_CONCURRENT_REQUESTS, MAX_TOKENS, PROMPT_LENGTH)
benchmark(LLM_URLS, PROMPT, NUM_WARMUP, NUM_REQUESTS, MAX_CONCURRENT_REQUESTS, MAX_TOKENS, PROMPT_LENGTH)
Loading
Loading