Skip to content

Commit

Permalink
refactor: use NonEmptyString for creating StageMgr (databendlabs#14850)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Mar 6, 2024
1 parent 8510bab commit 2ca7d52
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 34 deletions.
28 changes: 17 additions & 11 deletions src/query/management/src/stage/stage_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
Expand All @@ -49,18 +50,23 @@ pub struct StageMgr {
}

impl StageMgr {
pub fn create(kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>, tenant: &str) -> Result<Self> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while role mgr create)",
));
}

Ok(StageMgr {
pub fn create(
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
tenant: &NonEmptyString,
) -> Self {
StageMgr {
kv_api,
stage_prefix: format!("{}/{}", USER_STAGE_API_KEY_PREFIX, escape_for_key(tenant)?),
stage_file_prefix: format!("{}/{}", STAGE_FILE_API_KEY_PREFIX, escape_for_key(tenant)?),
})
stage_prefix: format!(
"{}/{}",
USER_STAGE_API_KEY_PREFIX,
escape_for_key(tenant.as_str()).unwrap()
),
stage_file_prefix: format!(
"{}/{}",
STAGE_FILE_API_KEY_PREFIX,
escape_for_key(tenant.as_str()).unwrap()
),
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/query/management/tests/it/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_meta_app::storage::StorageParams;
use databend_common_meta_app::storage::StorageS3Config;
use databend_common_meta_embedded::MetaEmbedded;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::SeqV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -138,7 +139,7 @@ fn create_test_stage_info() -> StageInfo {

async fn new_stage_api() -> Result<(Arc<MetaEmbedded>, StageMgr)> {
let test_api = Arc::new(MetaEmbedded::new_temp().await?);
let mgr = StageMgr::create(test_api.clone(), "admin")?;
let mgr = StageMgr::create(test_api.clone(), &NonEmptyString::new("admin").unwrap());
Ok((test_api, mgr))
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/common/grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn validate_grant_object_exists(
}
GrantObject::Stage(stage) => {
if !UserApiProvider::instance()
.exists_stage(ctx.get_tenant().as_str(), stage)
.exists_stage(&ctx.get_tenant(), stage)
.await?
{
return Err(databend_common_exception::ErrorCode::UnknownStage(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,35 @@ impl Interpreter for CreateUserStageInterpreter {

let quota_api = user_mgr.get_tenant_quota_api_client(&tenant)?;
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let stages = user_mgr.get_stages(&plan.tenant).await?;
let stages = user_mgr.get_stages(&tenant).await?;
if quota.max_stages != 0 && stages.len() >= quota.max_stages as usize {
return Err(ErrorCode::TenantQuotaExceeded(format!(
"Max stages quota exceeded {}",
quota.max_stages
)));
};

let tenant = NonEmptyString::new(plan.tenant.clone()).map_err(|_e| {
ErrorCode::TenantIsEmpty("tenant is empty when CreateUserStateInterpreter")
})?;

let old_stage = match plan.create_option {
CreateOption::CreateOrReplace => user_mgr
.get_stage(&plan.tenant, &user_stage.stage_name)
.get_stage(&tenant, &user_stage.stage_name)
.await
.ok(),
_ => None,
};

let tenant = NonEmptyString::new(plan.tenant.clone()).map_err(|_e| {
ErrorCode::TenantIsEmpty("tenant is empty when CreateUserStateInterpreter")
})?;

let mut user_stage = user_stage;
user_stage.creator = Some(self.ctx.get_current_user()?.identity());
user_stage.created_on = Utc::now();
let _ = user_mgr
.add_stage(&plan.tenant, user_stage.clone(), &plan.create_option)
.add_stage(&tenant, user_stage.clone(), &plan.create_option)
.await?;

// when create or replace stage success, if old stage is not External stage, remove stage files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ impl Interpreter for DropUserStageInterpreter {
));
}

let stage = user_mgr.get_stage(tenant.as_str(), &plan.name).await;
let stage = user_mgr.get_stage(&tenant, &plan.name).await;
user_mgr
.drop_stage(tenant.as_str(), &plan.name, plan.if_exists)
.drop_stage(&tenant, &plan.name, plan.if_exists)
.await?;

if let Ok(stage) = stage {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn upload_to_stage(
)
} else {
UserApiProvider::instance()
.get_stage(context.get_tenant().as_str(), &args.stage_name)
.get_stage(&context.get_tenant(), &args.stage_name)
.await
.map_err(InternalServerError)?
};
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ pub async fn resolve_stage_location(
StageInfo::new_user_stage(&ctx.get_current_user()?.name)
} else {
UserApiProvider::instance()
.get_stage(ctx.get_tenant().as_str(), names[0])
.get_stage(&ctx.get_tenant(), names[0])
.await?
};

Expand Down
4 changes: 1 addition & 3 deletions src/query/storages/system/src/stages_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ impl AsyncSystemTable for StagesTable {
_push_downs: Option<PushDownInfo>,
) -> Result<DataBlock> {
let tenant = ctx.get_tenant();
let stages = UserApiProvider::instance()
.get_stages(tenant.as_str())
.await?;
let stages = UserApiProvider::instance().get_stages(&tenant).await?;
let enable_experimental_rbac_check =
ctx.get_settings().get_enable_experimental_rbac_check()?;
let stages = if enable_experimental_rbac_check {
Expand Down
4 changes: 2 additions & 2 deletions src/query/users/src/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ impl UserApiProvider {
Arc::new(role_mgr)
}

pub fn get_stage_api_client(&self, tenant: &str) -> Result<Arc<dyn StageApi>> {
Ok(Arc::new(StageMgr::create(self.client.clone(), tenant)?))
pub fn stage_api(&self, tenant: &NonEmptyString) -> Arc<dyn StageApi> {
Arc::new(StageMgr::create(self.client.clone(), tenant))
}

pub fn get_file_format_api_client(&self, tenant: &str) -> Result<Arc<dyn FileFormatApi>> {
Expand Down
24 changes: 15 additions & 9 deletions src/query/users/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::StageInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::NonEmptyString;

use crate::UserApiProvider;

Expand All @@ -25,23 +26,23 @@ impl UserApiProvider {
#[async_backtrace::framed]
pub async fn add_stage(
&self,
tenant: &str,
tenant: &NonEmptyString,
info: StageInfo,
create_option: &CreateOption,
) -> Result<()> {
let stage_api_provider = self.get_stage_api_client(tenant)?;
let stage_api_provider = self.stage_api(tenant);
stage_api_provider.add_stage(info, create_option).await
}

// Get one stage from by tenant.
#[async_backtrace::framed]
pub async fn get_stage(&self, tenant: &str, stage_name: &str) -> Result<StageInfo> {
let stage_api_provider = self.get_stage_api_client(tenant)?;
pub async fn get_stage(&self, tenant: &NonEmptyString, stage_name: &str) -> Result<StageInfo> {
let stage_api_provider = self.stage_api(tenant);
stage_api_provider.get_stage(stage_name).await
}

#[async_backtrace::framed]
pub async fn exists_stage(&self, tenant: &str, stage_name: &str) -> Result<bool> {
pub async fn exists_stage(&self, tenant: &NonEmptyString, stage_name: &str) -> Result<bool> {
match self.get_stage(tenant, stage_name).await {
Ok(_) => Ok(true),
Err(err) => {
Expand All @@ -56,8 +57,8 @@ impl UserApiProvider {

// Get the tenant all stage list.
#[async_backtrace::framed]
pub async fn get_stages(&self, tenant: &str) -> Result<Vec<StageInfo>> {
let stage_api_provider = self.get_stage_api_client(tenant)?;
pub async fn get_stages(&self, tenant: &NonEmptyString) -> Result<Vec<StageInfo>> {
let stage_api_provider = self.stage_api(tenant);
let get_stages = stage_api_provider.get_stages();

match get_stages.await {
Expand All @@ -68,8 +69,13 @@ impl UserApiProvider {

// Drop a stage by name.
#[async_backtrace::framed]
pub async fn drop_stage(&self, tenant: &str, name: &str, if_exists: bool) -> Result<()> {
let stage_api_provider = self.get_stage_api_client(tenant)?;
pub async fn drop_stage(
&self,
tenant: &NonEmptyString,
name: &str,
if_exists: bool,
) -> Result<()> {
let stage_api_provider = self.stage_api(tenant);
let drop_stage = stage_api_provider.drop_stage(name);
match drop_stage.await {
Ok(res) => Ok(res),
Expand Down

0 comments on commit 2ca7d52

Please sign in to comment.