Skip to content

Commit

Permalink
Restore previous bug fix (#85)
Browse files Browse the repository at this point in the history
Restore previous bug fix
  • Loading branch information
guoqingbao authored Aug 21, 2024
1 parent 20703f8 commit d0a1060
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 70 deletions.
8 changes: 5 additions & 3 deletions kernels/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub const COPY_BLOCKS_KERNEL: &str = include_str!(concat!(env!("OUT_DIR"), "/copy_blocks_kernel.ptx"));
pub const COPY_BLOCKS_KERNEL: &str =
include_str!(concat!(env!("OUT_DIR"), "/copy_blocks_kernel.ptx"));
pub const PAGEDATTENTION: &str = include_str!(concat!(env!("OUT_DIR"), "/pagedattention.ptx"));
pub const RESHAPE_AND_CACHE_KERNEL: &str = include_str!(concat!(env!("OUT_DIR"), "/reshape_and_cache_kernel.ptx"));
pub mod ffi;
pub const RESHAPE_AND_CACHE_KERNEL: &str =
include_str!(concat!(env!("OUT_DIR"), "/reshape_and_cache_kernel.ptx"));
pub mod ffi;
28 changes: 17 additions & 11 deletions src/openai/openai_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,23 @@ pub async fn chat_completions(
let stream_request = request.stream.is_some_and(|x| x);
let model_name = request.model.clone();

//send completion request to inference engine
let mut model = data.model.lock().await;
model.add_request(
token_ids,
request_id.clone(),
SystemTime::now(),
sampling_params,
request.logprobs.unwrap_or(false),
Some(response_tx),
);
model.notify.notify_one();
let _ = tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
{
//send completion request to inference engine
let mut model = data.model.lock().await;
model.add_request(
token_ids,
request_id.clone(),
SystemTime::now(),
sampling_params,
request.logprobs.unwrap_or(false),
Some(response_tx),
);
model.notify.notify_one();
}
});
});

if stream_request {
ChatResponder::Streamer(
Expand Down
112 changes: 56 additions & 56 deletions src/openai/pipelines/llm_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,63 +83,63 @@ impl LLMEngine {
}));
let engine_clone = engine.clone();

tokio::runtime::Handle::current().block_on(async move {
loop {
notify.notified().await; // Blocking call to wait for notification
let _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let mut e = engine.lock().await;
let result = e.generate_once().unwrap();
if result.is_empty() {
continue;
}
for request_id in result.keys() {
e.completion_records
.insert(request_id.to_string(), result[request_id].clone());
}
finish_notify.notify_one();

//chat completion statistics
let overall_usage = ChatCompletionUsageResponse {
request_id: "".to_string(),
created: 0,
completion_tokens: result
.values()
.map(|(_, usage)| usage.completion_tokens)
.sum(),
prompt_tokens: result.values().map(|(_, usage)| usage.prompt_tokens).sum(),
total_tokens: result.values().map(|(_, usage)| usage.total_tokens).sum(),
prompt_time_costs: result
.values()
.map(|(_, usage)| usage.prompt_time_costs)
.max()
.unwrap_or(0),
completion_time_costs: result
.values()
.map(|(_, usage)| usage.completion_time_costs)
.max()
.unwrap_or(0),
};
let _ = tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
loop {
notify.notified().await; // Blocking call to wait for notification
let _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let mut e = engine.lock().await;
let result = e.generate_once().unwrap();
if result.len() == 0 {
continue;
}
for request_id in result.keys() {
e.completion_records.insert(request_id.to_string(), result[request_id].clone());
}
finish_notify.notify_one();

//chat completion statistics
let overall_usage = ChatCompletionUsageResponse {
request_id: "".to_string(),
created: 0,
completion_tokens: result.values()
.map(|(_, usage)| usage.completion_tokens)
.sum(),
prompt_tokens: result.values().map(|(_, usage)| usage.prompt_tokens).sum(),
total_tokens: result.values().map(|(_, usage)| usage.total_tokens).sum(),
prompt_time_costs: result
.values()
.map(|(_, usage)| usage.prompt_time_costs)
.max()
.unwrap_or(0),
completion_time_costs: result
.values()
.map(|(_, usage)| usage.completion_time_costs)
.max()
.unwrap_or(0),
};

println!(
"\r\n [{} requests] Prefilling: {} prompt tokens processed in {} seconds",
result.len(),
overall_usage.prompt_tokens,
overall_usage.prompt_time_costs / 1000
);

println!(
"\r\n [{} requests] Decoding: {} tokens processed in {} seconds ({} tokens/s)",
result.len(),
overall_usage.completion_tokens,
overall_usage.completion_time_costs / 1000,
overall_usage.completion_tokens * 1000
/ if overall_usage.completion_time_costs > 0 {
overall_usage.completion_time_costs
} else {
1
}
);
}
println!(
"\r\n [{} requests] Prefilling: {} prompt tokens processed in {} seconds",
result.len(),
overall_usage.prompt_tokens,
overall_usage.prompt_time_costs / 1000
);

println!(
"\r\n [{} requests] Decoding: {} tokens processed in {} seconds ({} tokens/s)",
result.len(),
overall_usage.completion_tokens,
overall_usage.completion_time_costs / 1000,
overall_usage.completion_tokens * 1000
/ if overall_usage.completion_time_costs > 0 {
overall_usage.completion_time_costs
} else {
1
}
);
}
});
});

Ok(engine_clone)
Expand Down

0 comments on commit d0a1060

Please sign in to comment.